sync with Python 3.11

This commit is contained in:
Michael R. Crusoe 2022-12-23 11:27:46 +01:00
parent 9ef8594837
commit b8b7eb8ecd
No known key found for this signature in database
GPG key ID: 3C26763F6C67E6E2
16 changed files with 3501 additions and 169 deletions

View file

@ -8,7 +8,7 @@ jobs:
strategy:
max-parallel: 5
matrix:
python-version: [3.6, 3.7, 3.8, 3.9, '3.10', 'pypy-3.8']
python-version: [3.6, 3.7, 3.8, 3.9, '3.10', '3.11', 'pypy-3.8']
steps:
- uses: actions/checkout@v2

View file

@ -1,8 +1,8 @@
"""contextlib2 - backports and enhancements to the contextlib module"""
import abc
import os
import sys
import warnings
import _collections_abc
from collections import deque
from functools import wraps
@ -17,16 +17,17 @@ except ImportError:
class GenericAlias:
pass
__all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext",
"AbstractContextManager", "AbstractAsyncContextManager",
"AsyncExitStack", "ContextDecorator", "ExitStack",
"redirect_stdout", "redirect_stderr", "suppress", "aclosing"]
"redirect_stdout", "redirect_stderr", "suppress", "aclosing",
"chdir"]
# Backwards compatibility
__all__ += ["ContextStack"]
class AbstractContextManager(abc.ABC):
"""An abstract base class for context managers."""
__class_getitem__ = classmethod(GenericAlias)
@ -143,18 +144,20 @@ class _GeneratorContextManagerBase:
# for the class instead.
# See http://bugs.python.org/issue19404 for more details.
class _GeneratorContextManager(_GeneratorContextManagerBase,
AbstractContextManager,
ContextDecorator):
"""Helper for @contextmanager decorator."""
def _recreate_cm(self):
# _GCM instances are one-shot context managers, so the
# _GCMB instances are one-shot context managers, so the
# CM must be recreated each time a decorated function is
# called
return self.__class__(self.func, self.args, self.kwds)
class _GeneratorContextManager(
_GeneratorContextManagerBase,
AbstractContextManager,
ContextDecorator,
):
"""Helper for @contextmanager decorator."""
def __enter__(self):
# do not keep args and kwds alive unnecessarily
# they are only needed for recreation, which is not possible anymore
@ -164,8 +167,8 @@ class _GeneratorContextManager(_GeneratorContextManagerBase,
except StopIteration:
raise RuntimeError("generator didn't yield") from None
def __exit__(self, type, value, traceback):
if type is None:
def __exit__(self, typ, value, traceback):
if typ is None:
try:
next(self.gen)
except StopIteration:
@ -176,9 +179,9 @@ class _GeneratorContextManager(_GeneratorContextManagerBase,
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = type()
value = typ()
try:
self.gen.throw(type, value, traceback)
self.gen.throw(typ, value, traceback)
except StopIteration as exc:
# Suppress StopIteration *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
@ -187,43 +190,45 @@ class _GeneratorContextManager(_GeneratorContextManagerBase,
except RuntimeError as exc:
# Don't re-raise the passed in exception. (issue27122)
if exc is value:
exc.__traceback__ = traceback
return False
# Likewise, avoid suppressing if a StopIteration exception
# Avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a RuntimeError
# (see PEP 479).
if type is StopIteration and exc.__cause__ is value:
# (see PEP 479 for sync generators; async generators also
# have this behavior). But do this only if the exception wrapped
# by the RuntimeError is actually Stop(Async)Iteration (see
# issue29692).
if (
isinstance(value, StopIteration)
and exc.__cause__ is value
):
exc.__traceback__ = traceback
return False
raise
except:
except BaseException as exc:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
# This cannot use 'except BaseException as exc' (as in the
# async implementation) to maintain compatibility with
# Python 2, where old-style class exceptions are not caught
# by 'except BaseException'.
if sys.exc_info()[1] is value:
return False
raise
if exc is not value:
raise
exc.__traceback__ = traceback
return False
raise RuntimeError("generator didn't stop after throw()")
class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
AbstractAsyncContextManager,
AsyncContextDecorator):
"""Helper for @asynccontextmanager."""
def _recreate_cm(self):
# _AGCM instances are one-shot context managers, so the
# ACM must be recreated each time a decorated function is
# called
return self.__class__(self.func, self.args, self.kwds)
class _AsyncGeneratorContextManager(
_GeneratorContextManagerBase,
AbstractAsyncContextManager,
AsyncContextDecorator,
):
"""Helper for @asynccontextmanager decorator."""
async def __aenter__(self):
# do not keep args and kwds alive unnecessarily
# they are only needed for recreation, which is not possible anymore
del self.args, self.kwds, self.func
try:
return await self.gen.__anext__()
except StopAsyncIteration:
@ -234,35 +239,48 @@ class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
try:
await self.gen.__anext__()
except StopAsyncIteration:
return
return False
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = typ()
# See _GeneratorContextManager.__exit__ for comments on subtleties
# in this implementation
try:
await self.gen.athrow(typ, value, traceback)
raise RuntimeError("generator didn't stop after athrow()")
except StopAsyncIteration as exc:
# Suppress StopIteration *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed.
return exc is not value
except RuntimeError as exc:
# Don't re-raise the passed in exception. (issue27122)
if exc is value:
return False
# Avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a RuntimeError
# Avoid suppressing if a Stop(Async)Iteration exception
# was passed to athrow() and later wrapped into a RuntimeError
# (see PEP 479 for sync generators; async generators also
# have this behavior). But do this only if the exception wrapped
# by the RuntimeError is actually Stop(Async)Iteration (see
# issue29692).
if isinstance(value, (StopIteration, StopAsyncIteration)):
if exc.__cause__ is value:
return False
if (
isinstance(value, (StopIteration, StopAsyncIteration))
and exc.__cause__ is value
):
return False
raise
except BaseException as exc:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
if exc is not value:
raise
return False
raise RuntimeError("generator didn't stop after athrow()")
def contextmanager(func):
@ -505,9 +523,14 @@ class _BaseExitStack:
"""
# We look up the special methods on the type to match the with
# statement.
_cm_type = type(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
cls = type(cm)
try:
_enter = cls.__enter__
_exit = cls.__exit__
except AttributeError:
raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
f"not support the context manager protocol") from None
result = _enter(cm)
self._push_cm_exit(cm, _exit)
return result
@ -565,10 +588,10 @@ class ExitStack(_BaseExitStack, AbstractContextManager):
# Context may not be correct, so find the end of the chain
while 1:
exc_context = new_exc.__context__
if exc_context is old_exc:
if exc_context is None or exc_context is old_exc:
# Context is already set correctly (see issue 20317)
return
if exc_context is None or exc_context is frame_exc:
if exc_context is frame_exc:
break
new_exc = exc_context
# Change the end of the chain to point to the exception
@ -641,9 +664,15 @@ class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager):
If successful, also pushes its __aexit__ method as a callback and
returns the result of the __aenter__ method.
"""
_cm_type = type(cm)
_exit = _cm_type.__aexit__
result = await _cm_type.__aenter__(cm)
cls = type(cm)
try:
_enter = cls.__aenter__
_exit = cls.__aexit__
except AttributeError:
raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
f"not support the asynchronous context manager protocol"
) from None
result = await _enter(cm)
self._push_async_cm_exit(cm, _exit)
return result
@ -708,10 +737,10 @@ class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager):
# Context may not be correct, so find the end of the chain
while 1:
exc_context = new_exc.__context__
if exc_context is old_exc:
if exc_context is None or exc_context is old_exc:
# Context is already set correctly (see issue 20317)
return
if exc_context is None or exc_context is frame_exc:
if exc_context is frame_exc:
break
new_exc = exc_context
# Change the end of the chain to point to the exception
@ -779,6 +808,19 @@ class nullcontext(AbstractContextManager, AbstractAsyncContextManager):
pass
class chdir(AbstractContextManager):
"""Non thread-safe context manager to change the current working directory."""
def __init__(self, path):
self.path = path
self._old_cwd = []
def __enter__(self):
self._old_cwd.append(os.getcwd())
os.chdir(self.path)
def __exit__(self, *excinfo):
os.chdir(self._old_cwd.pop())
# Preserve backwards compatibility
class ContextStack(ExitStack):
"""Backwards compatibility alias for ExitStack"""

View file

@ -1,24 +1,14 @@
# Type hints copied from the typeshed project under the Apache License 2.0
# https://github.com/python/typeshed/blob/64c85cdd449ccaff90b546676220c9ecfa6e697f/LICENSE
import abc
import sys
from ._typeshed import Self
from ._typeshed import Self, StrOrBytesPath
from abc import abstractmethod
from collections.abc import AsyncGenerator, AsyncIterator, Awaitable, Callable, Generator, Iterator
from types import TracebackType
from typing import (
IO,
Any,
AsyncContextManager,
AsyncIterator,
Awaitable,
Callable,
ContextManager,
Iterator,
Optional,
Type,
TypeVar,
overload,
)
from typing_extensions import ParamSpec, Protocol
from typing import IO, Any, Generic, Protocol, TypeVar, overload, runtime_checkable
from typing_extensions import ParamSpec, TypeAlias
# contextlib2 API adaptation notes:
# * the various 'if True:' guards replace sys.version checks in the original
@ -26,98 +16,137 @@ from typing_extensions import ParamSpec, Protocol
# * deliberately omitted APIs are listed in `dev/mypy.allowlist`
# (e.g. deprecated experimental APIs that never graduated to the stdlib)
AbstractContextManager = ContextManager
if True:
AbstractAsyncContextManager = AsyncContextManager
_T = TypeVar("_T")
_T_co = TypeVar("_T_co", covariant=True)
_T_io = TypeVar("_T_io", bound=Optional[IO[str]])
_T_io = TypeVar("_T_io", bound=IO[str] | None)
_F = TypeVar("_F", bound=Callable[..., Any])
_P = ParamSpec("_P")
_ExitFunc = Callable[[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]], bool]
_CM_EF = TypeVar("_CM_EF", ContextManager[Any], _ExitFunc)
_ExitFunc: TypeAlias = Callable[[type[BaseException] | None, BaseException | None, TracebackType | None], bool | None]
_CM_EF = TypeVar("_CM_EF", bound=AbstractContextManager[Any] | _ExitFunc)
class _GeneratorContextManager(ContextManager[_T_co]):
@runtime_checkable
class AbstractContextManager(Protocol[_T_co]):
def __enter__(self) -> _T_co: ...
@abstractmethod
def __exit__(
self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None
) -> bool | None: ...
@runtime_checkable
class AbstractAsyncContextManager(Protocol[_T_co]):
async def __aenter__(self) -> _T_co: ...
@abstractmethod
async def __aexit__(
self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None
) -> bool | None: ...
class ContextDecorator:
def __call__(self, func: _F) -> _F: ...
# type ignore to deal with incomplete ParamSpec support in mypy
def contextmanager(func: Callable[_P, Iterator[_T]]) -> Callable[_P, _GeneratorContextManager[_T]]: ... # type: ignore
class _GeneratorContextManager(AbstractContextManager[_T_co], ContextDecorator, Generic[_T_co]):
# __init__ and all instance attributes are actually inherited from _GeneratorContextManagerBase
# _GeneratorContextManagerBase is more trouble than it's worth to include in the stub; see #6676
def __init__(self, func: Callable[..., Iterator[_T_co]], args: tuple[Any, ...], kwds: dict[str, Any]) -> None: ...
gen: Generator[_T_co, Any, Any]
func: Callable[..., Generator[_T_co, Any, Any]]
args: tuple[Any, ...]
kwds: dict[str, Any]
def __exit__(
self, typ: type[BaseException] | None, value: BaseException | None, traceback: TracebackType | None
) -> bool | None: ...
def contextmanager(func: Callable[_P, Iterator[_T_co]]) -> Callable[_P, _GeneratorContextManager[_T_co]]: ...
if True:
def asynccontextmanager(func: Callable[_P, AsyncIterator[_T]]) -> Callable[_P, AsyncContextManager[_T]]: ... # type: ignore
_AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]])
class AsyncContextDecorator:
def __call__(self, func: _AF) -> _AF: ...
class _AsyncGeneratorContextManager(AbstractAsyncContextManager[_T_co], AsyncContextDecorator, Generic[_T_co]):
# __init__ and these attributes are actually defined in the base class _GeneratorContextManagerBase,
# which is more trouble than it's worth to include in the stub (see #6676)
def __init__(self, func: Callable[..., AsyncIterator[_T_co]], args: tuple[Any, ...], kwds: dict[str, Any]) -> None: ...
gen: AsyncGenerator[_T_co, Any]
func: Callable[..., AsyncGenerator[_T_co, Any]]
args: tuple[Any, ...]
kwds: dict[str, Any]
async def __aexit__(
self, typ: type[BaseException] | None, value: BaseException | None, traceback: TracebackType | None
) -> bool | None: ...
def asynccontextmanager(func: Callable[_P, AsyncIterator[_T_co]]) -> Callable[_P, _AsyncGeneratorContextManager[_T_co]]: ...
class _SupportsClose(Protocol):
def close(self) -> object: ...
_SupportsCloseT = TypeVar("_SupportsCloseT", bound=_SupportsClose)
class closing(ContextManager[_SupportsCloseT]):
class closing(AbstractContextManager[_SupportsCloseT]):
def __init__(self, thing: _SupportsCloseT) -> None: ...
def __exit__(self, *exc_info: object) -> None: ...
if True:
class _SupportsAclose(Protocol):
def aclose(self) -> Awaitable[object]: ...
_SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose)
class aclosing(AsyncContextManager[_SupportsAcloseT]):
def __init__(self, thing: _SupportsAcloseT) -> None: ...
_AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]])
class AsyncContextDecorator:
def __call__(self, func: _AF) -> _AF: ...
class suppress(ContextManager[None]):
def __init__(self, *exceptions: Type[BaseException]) -> None: ...
class aclosing(AbstractAsyncContextManager[_SupportsAcloseT]):
def __init__(self, thing: _SupportsAcloseT) -> None: ...
async def __aexit__(self, *exc_info: object) -> None: ...
class suppress(AbstractContextManager[None]):
def __init__(self, *exceptions: type[BaseException]) -> None: ...
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None
) -> bool: ...
class redirect_stdout(ContextManager[_T_io]):
class _RedirectStream(AbstractContextManager[_T_io]):
def __init__(self, new_target: _T_io) -> None: ...
def __exit__(
self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None
) -> None: ...
class redirect_stderr(ContextManager[_T_io]):
def __init__(self, new_target: _T_io) -> None: ...
class redirect_stdout(_RedirectStream[_T_io]): ...
class redirect_stderr(_RedirectStream[_T_io]): ...
class ContextDecorator:
def __call__(self, func: _F) -> _F: ...
class ExitStack(ContextManager[ExitStack]):
def __init__(self) -> None: ...
def enter_context(self, cm: ContextManager[_T]) -> _T: ...
# In reality this is a subclass of `AbstractContextManager`;
# see #7961 for why we don't do that in the stub
class ExitStack(metaclass=abc.ABCMeta):
def enter_context(self, cm: AbstractContextManager[_T]) -> _T: ...
def push(self, exit: _CM_EF) -> _CM_EF: ...
def callback(self, callback: Callable[..., Any], *args: Any, **kwds: Any) -> Callable[..., Any]: ...
def callback(self, __callback: Callable[_P, _T], *args: _P.args, **kwds: _P.kwargs) -> Callable[_P, _T]: ...
def pop_all(self: Self) -> Self: ...
def close(self) -> None: ...
def __enter__(self: Self) -> Self: ...
def __exit__(
self,
__exc_type: Optional[Type[BaseException]],
__exc_value: Optional[BaseException],
__traceback: Optional[TracebackType],
self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None
) -> bool: ...
if True:
_ExitCoroFunc = Callable[[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]], Awaitable[bool]]
_CallbackCoroFunc = Callable[..., Awaitable[Any]]
_ACM_EF = TypeVar("_ACM_EF", AsyncContextManager[Any], _ExitCoroFunc)
class AsyncExitStack(AsyncContextManager[AsyncExitStack]):
def __init__(self) -> None: ...
def enter_context(self, cm: ContextManager[_T]) -> _T: ...
def enter_async_context(self, cm: AsyncContextManager[_T]) -> Awaitable[_T]: ...
def push(self, exit: _CM_EF) -> _CM_EF: ...
def push_async_exit(self, exit: _ACM_EF) -> _ACM_EF: ...
def callback(self, callback: Callable[..., Any], *args: Any, **kwds: Any) -> Callable[..., Any]: ...
def push_async_callback(self, callback: _CallbackCoroFunc, *args: Any, **kwds: Any) -> _CallbackCoroFunc: ...
def pop_all(self: Self) -> Self: ...
def aclose(self) -> Awaitable[None]: ...
def __aenter__(self: Self) -> Awaitable[Self]: ...
def __aexit__(
self,
__exc_type: Optional[Type[BaseException]],
__exc_value: Optional[BaseException],
__traceback: Optional[TracebackType],
) -> Awaitable[bool]: ...
_ExitCoroFunc: TypeAlias = Callable[
[type[BaseException] | None, BaseException | None, TracebackType | None], Awaitable[bool | None]
]
_ACM_EF = TypeVar("_ACM_EF", bound=AbstractAsyncContextManager[Any] | _ExitCoroFunc)
# In reality this is a subclass of `AbstractAsyncContextManager`;
# see #7961 for why we don't do that in the stub
class AsyncExitStack(metaclass=abc.ABCMeta):
def enter_context(self, cm: AbstractContextManager[_T]) -> _T: ...
async def enter_async_context(self, cm: AbstractAsyncContextManager[_T]) -> _T: ...
def push(self, exit: _CM_EF) -> _CM_EF: ...
def push_async_exit(self, exit: _ACM_EF) -> _ACM_EF: ...
def callback(self, __callback: Callable[_P, _T], *args: _P.args, **kwds: _P.kwargs) -> Callable[_P, _T]: ...
def push_async_callback(
self, __callback: Callable[_P, Awaitable[_T]], *args: _P.args, **kwds: _P.kwargs
) -> Callable[_P, Awaitable[_T]]: ...
def pop_all(self: Self) -> Self: ...
async def aclose(self) -> None: ...
async def __aenter__(self: Self) -> Self: ...
async def __aexit__(
self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None
) -> bool: ...
if True:
class nullcontext(AbstractContextManager[_T], AbstractAsyncContextManager[_T]):
@ -127,6 +156,15 @@ if True:
@overload
def __init__(self: nullcontext[_T], enter_result: _T) -> None: ...
def __enter__(self) -> _T: ...
def __exit__(self, *exctype: Any) -> None: ...
def __exit__(self, *exctype: object) -> None: ...
async def __aenter__(self) -> _T: ...
async def __aexit__(self, *exctype: Any) -> None: ...
async def __aexit__(self, *exctype: object) -> None: ...
if True:
_T_fd_or_any_path = TypeVar("_T_fd_or_any_path", bound=int | StrOrBytesPath)
class chdir(AbstractContextManager[None], Generic[_T_fd_or_any_path]):
path: _T_fd_or_any_path
def __init__(self, path: _T_fd_or_any_path) -> None: ...
def __enter__(self) -> None: ...
def __exit__(self, *excinfo: object) -> None: ...

View file

@ -1,5 +1,9 @@
from typing import TypeVar # pragma: no cover
from os import PathLike
from typing import TypeVar
from typing_extensions import TypeAlias
# Use for "self" annotations:
# def __enter__(self: Self) -> Self: ...
Self = TypeVar("Self") # pragma: no cover
Self = TypeVar("Self") # noqa: Y001
StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] # stable

View file

@ -1,3 +1,6 @@
contextlib2.AbstractContextManager.__class_getitem__
contextlib2.AbstractAsyncContextManager.__class_getitem__
contextlib2._typeshed.PathLike.__class_getitem__
# Deprecated APIs that never graduated to the standard library
contextlib2.ContextDecorator.refresh_cm
contextlib2.ContextStack

2
test/data/README Normal file
View file

@ -0,0 +1,2 @@
This empty directory serves as destination for temporary files
created by some tests, in particular, the test_codecmaps_* tests.

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,717 @@
"""Enough of the test.support.os_helper APIs to run the contextlib test suite"""
import collections.abc
import contextlib
import errno
import os
import re
import stat
import sys
import time
import unittest
import warnings
unlink = os.unlink
# Filename used for testing
if os.name == 'java':
# Jython disallows @ in module names
TESTFN_ASCII = '$test'
else:
TESTFN_ASCII = '@test'
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
# module name.
TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
# TESTFN_UNICODE is a non-ascii filename
TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
if sys.platform == 'darwin':
# In Mac OS X's VFS API file names are, by definition, canonically
# decomposed Unicode, encoded using UTF-8. See QA1173:
# http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
import unicodedata
TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
# encoded by the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename.
TESTFN_UNENCODABLE = None
if os.name == 'nt':
# skip win32s (0) or Windows 9x/ME (1)
if sys.getwindowsversion().platform >= 2:
# Different kinds of characters from various languages to minimize the
# probability that the whole name is encodable to MBCS (issue #9819)
TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
try:
TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
else:
print('WARNING: The filename %r CAN be encoded by the filesystem '
'encoding (%s). Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# macOS and Emscripten deny unencodable filenames (invalid utf-8)
elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
# 0xff will be encoded using the surrogate character u+DCFF
TESTFN_UNENCODABLE = TESTFN_ASCII \
+ b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
else:
# File system encoding (eg. ISO-8859-* encodings) can encode
# the byte 0xff. Skip some unicode filename tests.
pass
# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
# or an empty string if there is no such character.
FS_NONASCII = ''
for character in (
# First try printable and common characters to have a readable filename.
# For each character, the encoding list are just example of encodings able
# to encode the character (the list is not exhaustive).
# U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
'\u00E6',
# U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
'\u0130',
# U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
'\u0141',
# U+03C6 (Greek Small Letter Phi): cp1253
'\u03C6',
# U+041A (Cyrillic Capital Letter Ka): cp1251
'\u041A',
# U+05D0 (Hebrew Letter Alef): Encodable to cp424
'\u05D0',
# U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
'\u060C',
# U+062A (Arabic Letter Teh): cp720
'\u062A',
# U+0E01 (Thai Character Ko Kai): cp874
'\u0E01',
# Then try more "special" characters. "special" because they may be
# interpreted or displayed differently depending on the exact locale
# encoding and the font.
# U+00A0 (No-Break Space)
'\u00A0',
# U+20AC (Euro Sign)
'\u20AC',
):
try:
# If Python is set up to use the legacy 'mbcs' in Windows,
# 'replace' error mode is used, and encode() returns b'?'
# for characters missing in the ANSI codepage
if os.fsdecode(os.fsencode(character)) != character:
raise UnicodeError
except UnicodeError:
pass
else:
FS_NONASCII = character
break
# Save the initial cwd
SAVEDCWD = os.getcwd()
# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
# decoded from the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename (ex: the latin1 encoding can decode any byte
# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
# to the surrogateescape error handler (PEP 383), but not from the filesystem
# encoding in strict mode.
TESTFN_UNDECODABLE = None
for name in (
# b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
# accepts it to create a file or a directory, or don't accept to enter to
# such directory (when the bytes name is used). So test b'\xe7' first:
# it is not decodable from cp932.
b'\xe7w\xf0',
# undecodable from ASCII, UTF-8
b'\xff',
# undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
# and cp857
b'\xae\xd5'
# undecodable from UTF-8 (UNIX and Mac OS X)
b'\xed\xb2\x80', b'\xed\xb4\x80',
# undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
# cp1253, cp1254, cp1255, cp1257, cp1258
b'\x81\x98',
):
try:
name.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
break
if FS_NONASCII:
TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
else:
TESTFN_NONASCII = None
TESTFN = TESTFN_NONASCII or TESTFN_ASCII
def make_bad_fd():
"""
Create an invalid file descriptor by opening and closing a file and return
its fd.
"""
file = open(TESTFN, "wb")
try:
return file.fileno()
finally:
file.close()
unlink(TESTFN)
_can_symlink = None
def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
# WASI / wasmtime prevents symlinks with absolute paths, see man
# openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
# paths. Skip symlink tests on WASI for now.
src = os.path.abspath(TESTFN)
symlink_path = src + "can_symlink"
try:
os.symlink(src, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
else:
os.remove(symlink_path)
_can_symlink = can
return can
def skip_unless_symlink(test):
"""Skip decorator for tests that require functional symlink"""
ok = can_symlink()
msg = "Requires functional symlink implementation"
return test if ok else unittest.skip(msg)(test)
_can_xattr = None
def can_xattr():
import tempfile
global _can_xattr
if _can_xattr is not None:
return _can_xattr
if not hasattr(os, "setxattr"):
can = False
else:
import platform
tmp_dir = tempfile.mkdtemp()
tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
try:
with open(TESTFN, "wb") as fp:
try:
# TESTFN & tempfile may use different file systems with
# different capabilities
os.setxattr(tmp_fp, b"user.test", b"")
os.setxattr(tmp_name, b"trusted.foo", b"42")
os.setxattr(fp.fileno(), b"user.test", b"")
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match(r"2.6.(\d{1,2})", kernel_version)
can = m is None or int(m.group(1)) >= 39
except OSError:
can = False
finally:
unlink(TESTFN)
unlink(tmp_name)
rmdir(tmp_dir)
_can_xattr = can
return can
def skip_unless_xattr(test):
"""Skip decorator for tests that require functional extended attributes"""
ok = can_xattr()
msg = "no non-broken extended attribute support"
return test if ok else unittest.skip(msg)(test)
_can_chmod = None
def can_chmod():
global _can_chmod
if _can_chmod is not None:
return _can_chmod
if not hasattr(os, "chown"):
_can_chmod = False
return _can_chmod
try:
with open(TESTFN, "wb") as f:
try:
os.chmod(TESTFN, 0o777)
mode1 = os.stat(TESTFN).st_mode
os.chmod(TESTFN, 0o666)
mode2 = os.stat(TESTFN).st_mode
except OSError as e:
can = False
else:
can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
finally:
unlink(TESTFN)
_can_chmod = can
return can
def skip_unless_working_chmod(test):
"""Skip tests that require working os.chmod()
WASI SDK 15.0 cannot change file mode bits.
"""
ok = can_chmod()
msg = "requires working os.chmod()"
return test if ok else unittest.skip(msg)(test)
# Check whether the current effective user has the capability to override
# DAC (discretionary access control). Typically user root is able to
# bypass file read, write, and execute permission checks. The capability
# is independent of the effective user. See capabilities(7).
_can_dac_override = None
def can_dac_override():
global _can_dac_override
if not can_chmod():
_can_dac_override = False
if _can_dac_override is not None:
return _can_dac_override
try:
with open(TESTFN, "wb") as f:
os.chmod(TESTFN, 0o400)
try:
with open(TESTFN, "wb"):
pass
except OSError:
_can_dac_override = False
else:
_can_dac_override = True
finally:
unlink(TESTFN)
return _can_dac_override
def skip_if_dac_override(test):
ok = not can_dac_override()
msg = "incompatible with CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)
def skip_unless_dac_override(test):
ok = can_dac_override()
msg = "requires CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)
def unlink(filename):
try:
_unlink(filename)
except (FileNotFoundError, NotADirectoryError):
pass
if sys.platform.startswith("win"):
def _waitfor(func, pathname, waitall=False):
# Perform the operation
func(pathname)
# Now setup the wait loop
if waitall:
dirname = pathname
else:
dirname, name = os.path.split(pathname)
dirname = dirname or '.'
# Check for `pathname` to be removed from the filesystem.
# The exponential backoff of the timeout amounts to a total
# of ~1 second after which the deletion is probably an error
# anyway.
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
# access rights. If we have made it this far, we have sufficient
# permissions to do that much using Python's equivalent of the
# Windows API FindFirstFile.
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
if not (L if waitall else name in L):
return
# Increase the timeout and try again
time.sleep(timeout)
timeout *= 2
warnings.warn('tests may fail, delete still pending for ' + pathname,
RuntimeWarning, stacklevel=4)
def _unlink(filename):
_waitfor(os.unlink, filename)
def _rmdir(dirname):
_waitfor(os.rmdir, dirname)
def _rmtree(path):
from test.support import _force_run
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError as exc:
print("support.rmtree(): os.lstat(%r) failed with %s"
% (fullname, exc),
file=sys.__stderr__)
mode = 0
if stat.S_ISDIR(mode):
_waitfor(_rmtree_inner, fullname, waitall=True)
_force_run(fullname, os.rmdir, fullname)
else:
_force_run(fullname, os.unlink, fullname)
_waitfor(_rmtree_inner, path, waitall=True)
_waitfor(lambda p: _force_run(p, os.rmdir, p), path)
def _longpath(path):
try:
import ctypes
except ImportError:
# No ctypes means we can't expands paths.
pass
else:
buffer = ctypes.create_unicode_buffer(len(path) * 2)
length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
len(buffer))
if length:
return buffer[:length]
return path
else:
_unlink = os.unlink
_rmdir = os.rmdir
def _rmtree(path):
import shutil
try:
shutil.rmtree(path)
return
except OSError:
pass
def _rmtree_inner(path):
from test.support import _force_run
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError:
mode = 0
if stat.S_ISDIR(mode):
_rmtree_inner(fullname)
_force_run(path, os.rmdir, fullname)
else:
_force_run(path, os.unlink, fullname)
_rmtree_inner(path)
os.rmdir(path)
def _longpath(path):
return path
def rmdir(dirname):
try:
_rmdir(dirname)
except FileNotFoundError:
pass
def rmtree(path):
try:
_rmtree(path)
except FileNotFoundError:
pass
@contextlib.contextmanager
def temp_dir(path=None, quiet=False):
"""Return a context manager that creates a temporary directory.
Arguments:
path: the directory to create temporarily. If omitted or None,
defaults to creating a temporary directory using tempfile.mkdtemp.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, if the path is specified and cannot be
created, only a warning is issued.
"""
import tempfile
dir_created = False
if path is None:
path = tempfile.mkdtemp()
dir_created = True
path = os.path.realpath(path)
else:
try:
os.mkdir(path)
dir_created = True
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to create '
f'temporary directory {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
if dir_created:
pid = os.getpid()
try:
yield path
finally:
# In case the process forks, let only the parent remove the
# directory. The child has a different process id. (bpo-30028)
if dir_created and pid == os.getpid():
rmtree(path)
@contextlib.contextmanager
def change_cwd(path, quiet=False):
"""Return a context manager that changes the current working directory.
Arguments:
path: the directory to use as the temporary current working directory.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
"""
saved_dir = os.getcwd()
try:
os.chdir(os.path.realpath(path))
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to change the current working '
f'directory to {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
try:
yield os.getcwd()
finally:
os.chdir(saved_dir)
@contextlib.contextmanager
def temp_cwd(name='tempcwd', quiet=False):
"""
Context manager that temporarily creates and changes the CWD.
The function temporarily changes the current working directory
after creating a temporary directory in the current directory with
name *name*. If *name* is None, the temporary directory is
created using tempfile.mkdtemp.
If *quiet* is False (default) and it is not possible to
create or change the CWD, an error is raised. If *quiet* is True,
only a warning is raised and the original CWD is used.
"""
with temp_dir(path=name, quiet=quiet) as temp_path:
with change_cwd(temp_path, quiet=quiet) as cwd_dir:
yield cwd_dir
def create_empty_file(filename):
"""Create an empty file. If the file already exists, truncate it."""
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(fd)
@contextlib.contextmanager
def open_dir_fd(path):
"""Open a file descriptor to a directory."""
assert os.path.isdir(path)
flags = os.O_RDONLY
if hasattr(os, "O_DIRECTORY"):
flags |= os.O_DIRECTORY
dir_fd = os.open(path, flags)
try:
yield dir_fd
finally:
os.close(dir_fd)
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory
is case-insensitive."""
import tempfile
with tempfile.NamedTemporaryFile(dir=directory) as base:
base_path = base.name
case_path = base_path.upper()
if case_path == base_path:
case_path = base_path.lower()
try:
return os.path.samefile(base_path, case_path)
except FileNotFoundError:
return False
class FakePath:
"""Simple implementing of the path protocol.
"""
def __init__(self, path):
self.path = path
def __repr__(self):
return f'<FakePath {self.path!r}>'
def __fspath__(self):
if (isinstance(self.path, BaseException) or
isinstance(self.path, type) and
issubclass(self.path, BaseException)):
raise self.path
else:
return self.path
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
try:
names = os.listdir("/proc/self/fd")
# Subtract one because listdir() internally opens a file
# descriptor to list the content of the /proc/self/fd/ directory.
return len(names) - 1
except FileNotFoundError:
pass
MAXFD = 256
if hasattr(os, 'sysconf'):
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except OSError:
pass
old_modes = None
if sys.platform == 'win32':
# bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
# on invalid file descriptor if Python is compiled in debug mode
try:
import msvcrt
msvcrt.CrtSetReportMode
except (AttributeError, ImportError):
# no msvcrt or a release build
pass
else:
old_modes = {}
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
0)
try:
count = 0
for fd in range(MAXFD):
try:
# Prefer dup() over fstat(). fstat() can require input/output
# whereas dup() doesn't.
fd2 = os.dup(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
else:
os.close(fd2)
count += 1
finally:
if old_modes is not None:
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
return count
if hasattr(os, "umask"):
@contextlib.contextmanager
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
else:
@contextlib.contextmanager
def temp_umask(umask):
"""no-op on platforms without umask()"""
yield
class EnvironmentVarGuard(collections.abc.MutableMapping):
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
def __init__(self):
self._environ = os.environ
self._changed = {}
def __getitem__(self, envvar):
return self._environ[envvar]
def __setitem__(self, envvar, value):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
self._environ[envvar] = value
def __delitem__(self, envvar):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
if envvar in self._environ:
del self._environ[envvar]
def keys(self):
return self._environ.keys()
def __iter__(self):
return iter(self._environ)
def __len__(self):
return len(self._environ)
def set(self, envvar, value):
self[envvar] = value
def unset(self, envvar):
del self[envvar]
def copy(self):
# We do what os.environ.copy() does.
return dict(self)
def __enter__(self):
return self
def __exit__(self, *ignore_exc):
for (k, v) in self._changed.items():
if v is None:
if k in self._environ:
del self._environ[k]
else:
self._environ[k] = v
os.environ = self._environ

View file

@ -1,15 +1,16 @@
"""Unit tests for contextlib.py, and other context managers."""
import io
import os
import sys
import tempfile
import threading
import traceback
import unittest
from contextlib2 import * # Tests __all__
from test import support
from test.support import os_helper
import weakref
import gc
class TestAbstractContextManager(unittest.TestCase):
@ -87,6 +88,32 @@ class ContextManagerTestCase(unittest.TestCase):
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_traceback(self):
@contextmanager
def f():
yield
try:
with f():
1/0
except ZeroDivisionError as e:
frames = traceback.extract_tb(e.__traceback__)
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
self.assertEqual(frames[0].line, '1/0')
# Repeat with RuntimeError (which goes through a different code path)
try:
with f():
raise NotImplementedError(42)
except NotImplementedError as e:
frames = traceback.extract_tb(e.__traceback__)
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
self.assertEqual(frames[0].line, 'raise NotImplementedError(42)')
def test_contextmanager_no_reraise(self):
@contextmanager
def whee():
@ -127,19 +154,22 @@ class ContextManagerTestCase(unittest.TestCase):
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except_stopiter(self):
stop_exc = StopIteration('spam')
@contextmanager
def woohoo():
yield
try:
with self.assertWarnsRegex(DeprecationWarning,
"StopIteration"):
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')
class StopIterationSubclass(StopIteration):
pass
for stop_exc in (StopIteration('spam'), StopIterationSubclass('spam')):
with self.subTest(type=type(stop_exc)):
try:
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail(f'{stop_exc} was suppressed')
def test_contextmanager_except_pep479(self):
code = """\
@ -230,7 +260,7 @@ def woohoo():
a = weakref.ref(a)
b = weakref.ref(b)
# Allow test to work with a non-refcounted GC
gc.collect(); gc.collect(); gc.collect()
support.gc_collect()
self.assertIsNone(a())
self.assertIsNone(b())
yield
@ -493,10 +523,14 @@ class TestContextDecorator(unittest.TestCase):
pass
def __exit__(self, *exc):
pass
with self.assertRaises(AttributeError):
with mycontext():
pass
if sys.version_info >= (3, 11):
with self.assertRaisesRegex(TypeError, 'the context manager'):
with mycontext():
pass
else:
with self.assertRaises(AttributeError):
with mycontext():
pass
def test_typo_exit(self):
@ -506,9 +540,15 @@ class TestContextDecorator(unittest.TestCase):
def __uxit__(self, *exc):
pass
with self.assertRaises(AttributeError):
with mycontext():
pass
if sys.version_info >= (3, 11):
with self.assertRaisesRegex(TypeError, 'the context manager.*__exit__'):
with mycontext():
pass
else:
with self.assertRaisesRegex(AttributeError, '__exit__'):
with mycontext():
pass
def test_contextdecorator_as_mixin(self):
@ -664,6 +704,25 @@ class TestBaseExitStack:
result.append(2)
self.assertEqual(result, [1, 2, 3, 4])
def test_enter_context_errors(self):
class LacksEnterAndExit:
pass
class LacksEnter:
def __exit__(self, *exc_info):
pass
class LacksExit:
def __enter__(self):
pass
with self.exit_stack() as stack:
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(LacksEnterAndExit())
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(LacksEnter())
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(LacksExit())
self.assertFalse(stack._exit_callbacks)
def test_close(self):
result = []
with self.exit_stack() as stack:
@ -699,6 +758,38 @@ class TestBaseExitStack:
stack.push(lambda *exc: True)
1/0
def test_exit_exception_traceback(self):
# This test captures the current behavior of ExitStack so that we know
# if we ever unintendedly change it. It is not a statement of what the
# desired behavior is (for instance, we may want to remove some of the
# internal contextlib frames).
def raise_exc(exc):
raise exc
try:
with self.exit_stack() as stack:
stack.callback(raise_exc, ValueError)
1/0
except ValueError as e:
exc = e
self.assertIsInstance(exc, ValueError)
ve_frames = traceback.extract_tb(exc.__traceback__)
expected = \
[('test_exit_exception_traceback', 'with self.exit_stack() as stack:')] + \
self.callback_error_internal_frames + \
[('_exit_wrapper', 'callback(*args, **kwds)'),
('raise_exc', 'raise exc')]
self.assertEqual(
[(f.name, f.line) for f in ve_frames], expected)
self.assertIsInstance(exc.__context__, ZeroDivisionError)
zde_frames = traceback.extract_tb(exc.__context__.__traceback__)
self.assertEqual([(f.name, f.line) for f in zde_frames],
[('test_exit_exception_traceback', '1/0')])
def test_exit_exception_chaining_reference(self):
# Sanity check to make sure that ExitStack chaining matches
# actual nested with statements
@ -778,6 +869,40 @@ class TestBaseExitStack:
self.assertIsInstance(inner_exc, ValueError)
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
def test_exit_exception_explicit_none_context(self):
# Ensure ExitStack chaining matches actual nested `with` statements
# regarding explicit __context__ = None.
class MyException(Exception):
pass
@contextmanager
def my_cm():
try:
yield
except BaseException:
exc = MyException()
try:
raise exc
finally:
exc.__context__ = None
@contextmanager
def my_cm_with_exit_stack():
with self.exit_stack() as stack:
stack.enter_context(my_cm())
yield stack
for cm in (my_cm, my_cm_with_exit_stack):
with self.subTest():
try:
with cm():
raise IndexError()
except MyException as exc:
self.assertIsNone(exc.__context__)
else:
self.fail("Expected IndexError, but no exception was raised")
def test_exit_exception_non_suppressing(self):
# http://bugs.python.org/issue19092
def raise_exc(exc):
@ -889,9 +1014,11 @@ class TestBaseExitStack:
def test_instance_bypass(self):
class Example(object): pass
cm = Example()
cm.__enter__ = object()
cm.__exit__ = object()
stack = self.exit_stack()
self.assertRaises(AttributeError, stack.enter_context, cm)
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(cm)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1][1], cm)
@ -932,6 +1059,10 @@ class TestBaseExitStack:
class TestExitStack(TestBaseExitStack, unittest.TestCase):
exit_stack = ExitStack
callback_error_internal_frames = [
('__exit__', 'raise exc_details[1]'),
('__exit__', 'if cb(*exc_details):'),
]
class TestRedirectStream:
@ -1057,5 +1188,53 @@ class TestSuppress(unittest.TestCase):
1/0
self.assertTrue(outer_continued)
class TestChdir(unittest.TestCase):
def make_relative_path(self, *parts):
return os.path.join(
os.path.dirname(os.path.realpath(__file__)),
*parts,
)
def test_simple(self):
old_cwd = os.getcwd()
target = self.make_relative_path('data')
self.assertNotEqual(old_cwd, target)
with chdir(target):
self.assertEqual(os.getcwd(), target)
self.assertEqual(os.getcwd(), old_cwd)
def test_reentrant(self):
old_cwd = os.getcwd()
target1 = self.make_relative_path('data')
target2 = self.make_relative_path('ziptestdata')
self.assertNotIn(old_cwd, (target1, target2))
chdir1, chdir2 = chdir(target1), chdir(target2)
with chdir1:
self.assertEqual(os.getcwd(), target1)
with chdir2:
self.assertEqual(os.getcwd(), target2)
with chdir1:
self.assertEqual(os.getcwd(), target1)
self.assertEqual(os.getcwd(), target2)
self.assertEqual(os.getcwd(), target1)
self.assertEqual(os.getcwd(), old_cwd)
def test_exception(self):
old_cwd = os.getcwd()
target = self.make_relative_path('data')
self.assertNotEqual(old_cwd, target)
try:
with chdir(target):
self.assertEqual(os.getcwd(), target)
raise RuntimeError("boom")
except RuntimeError as re:
self.assertEqual(str(re), "boom")
self.assertEqual(os.getcwd(), old_cwd)
if __name__ == "__main__":
unittest.main()

View file

@ -8,21 +8,19 @@ import unittest
from test.test_contextlib import TestBaseExitStack
support.requires_working_socket(module=True)
def _async_test(func):
"""Decorator to turn an async function into a test case."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
coro = func(*args, **kwargs)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
asyncio.set_event_loop_policy(None)
asyncio.run(coro)
return wrapper
def tearDownModule():
asyncio.set_event_loop_policy(None)
class TestAbstractAsyncContextManager(unittest.TestCase):
@ -209,7 +207,18 @@ class AsyncContextManagerTestCase(unittest.TestCase):
async def woohoo():
yield
for stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')):
class StopIterationSubclass(StopIteration):
pass
class StopAsyncIterationSubclass(StopAsyncIteration):
pass
for stop_exc in (
StopIteration('spam'),
StopAsyncIteration('ham'),
StopIterationSubclass('spam'),
StopAsyncIterationSubclass('spam')
):
with self.subTest(type=type(stop_exc)):
try:
async with woohoo():
@ -307,6 +316,82 @@ class AsyncContextManagerTestCase(unittest.TestCase):
self.assertEqual(ncols, 10)
self.assertEqual(depth, 0)
@_async_test
async def test_decorator(self):
entered = False
@asynccontextmanager
async def context():
nonlocal entered
entered = True
yield
entered = False
@context()
async def test():
self.assertTrue(entered)
self.assertFalse(entered)
await test()
self.assertFalse(entered)
@_async_test
async def test_decorator_with_exception(self):
entered = False
@asynccontextmanager
async def context():
nonlocal entered
try:
entered = True
yield
finally:
entered = False
@context()
async def test():
self.assertTrue(entered)
raise NameError('foo')
self.assertFalse(entered)
with self.assertRaisesRegex(NameError, 'foo'):
await test()
self.assertFalse(entered)
@_async_test
async def test_decorating_method(self):
@asynccontextmanager
async def context():
yield
class Test(object):
@context()
async def method(self, a, b, c=None):
self.a = a
self.b = b
self.c = c
# these tests are for argument passing when used as a decorator
test = Test()
await test.method(1, 2)
self.assertEqual(test.a, 1)
self.assertEqual(test.b, 2)
self.assertEqual(test.c, None)
test = Test()
await test.method('a', 'b', 'c')
self.assertEqual(test.a, 'a')
self.assertEqual(test.b, 'b')
self.assertEqual(test.c, 'c')
test = Test()
await test.method(a=1, b=2)
self.assertEqual(test.a, 1)
self.assertEqual(test.b, 2)
class AclosingTestCase(unittest.TestCase):
@ -399,6 +484,13 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase):
return self.run_coroutine(self.__aexit__(*exc_details))
exit_stack = SyncAsyncExitStack
callback_error_internal_frames = [
('__exit__', 'return self.run_coroutine(self.__aexit__(*exc_details))'),
('run_coroutine', 'raise exc'),
('run_coroutine', 'raise exc'),
('__aexit__', 'raise exc_details[1]'),
('__aexit__', 'cb_suppress = cb(*exc_details)'),
]
def setUp(self):
self.loop = asyncio.new_event_loop()
@ -486,7 +578,7 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase):
1/0
@_async_test
async def test_async_enter_context(self):
async def test_enter_async_context(self):
class TestCM(object):
async def __aenter__(self):
result.append(1)
@ -507,6 +599,26 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase):
self.assertEqual(result, [1, 2, 3, 4])
@_async_test
async def test_enter_async_context_errors(self):
class LacksEnterAndExit:
pass
class LacksEnter:
async def __aexit__(self, *exc_info):
pass
class LacksExit:
async def __aenter__(self):
pass
async with self.exit_stack() as stack:
with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
await stack.enter_async_context(LacksEnterAndExit())
with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
await stack.enter_async_context(LacksEnter())
with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
await stack.enter_async_context(LacksExit())
self.assertFalse(stack._exit_callbacks)
@_async_test
async def test_async_exit_exception_chaining(self):
# Ensure exception chaining matches the reference behaviour
@ -539,6 +651,53 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase):
self.assertIsInstance(inner_exc, ValueError)
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
@_async_test
async def test_async_exit_exception_explicit_none_context(self):
# Ensure AsyncExitStack chaining matches actual nested `with` statements
# regarding explicit __context__ = None.
class MyException(Exception):
pass
@asynccontextmanager
async def my_cm():
try:
yield
except BaseException:
exc = MyException()
try:
raise exc
finally:
exc.__context__ = None
@asynccontextmanager
async def my_cm_with_exit_stack():
async with self.exit_stack() as stack:
await stack.enter_async_context(my_cm())
yield stack
for cm in (my_cm, my_cm_with_exit_stack):
with self.subTest():
try:
async with cm():
raise IndexError()
except MyException as exc:
self.assertIsNone(exc.__context__)
else:
self.fail("Expected IndexError, but no exception was raised")
@_async_test
async def test_instance_bypass_async(self):
class Example(object): pass
cm = Example()
cm.__aenter__ = object()
cm.__aexit__ = object()
stack = self.exit_stack()
with self.assertRaisesRegex(TypeError, 'asynchronous context manager'):
await stack.enter_async_context(cm)
stack.push_async_exit(cm)
self.assertIs(stack._exit_callbacks[-1][1], cm)
class TestAsyncNullcontext(unittest.TestCase):
@_async_test

View file

@ -0,0 +1,35 @@
# Test data for `test_zipfile`
The test executables in this directory are created manually from header.sh and
the `testdata_module_inside_zip.py` file. You must have infozip's zip utility
installed (`apt install zip` on Debian).
## Purpose
These are used to test executable files with an appended zipfile, in a scenario
where the executable is _not_ a Python interpreter itself so our automatic
zipimport machinery (that'd look for `__main__.py`) is not being used.
## Updating the test executables
If you update header.sh or the testdata_module_inside_zip.py file, rerun the
commands below. These are expected to be rarely changed, if ever.
### Standard old format (2.0) zip file
```
zip -0 zip2.zip testdata_module_inside_zip.py
cat header.sh zip2.zip >exe_with_zip
rm zip2.zip
```
### Modern format (4.5) zip64 file
Redirecting from stdin forces infozip's zip tool to create a zip64.
```
zip -0 <testdata_module_inside_zip.py >zip64.zip
cat header.sh zip64.zip >exe_with_z64
rm zip64.zip
```

BIN
test/ziptestdata/exe_with_z64 Executable file

Binary file not shown.

BIN
test/ziptestdata/exe_with_zip Executable file

Binary file not shown.

24
test/ziptestdata/header.sh Executable file
View file

@ -0,0 +1,24 @@
#!/bin/bash
INTERPRETER_UNDER_TEST="$1"
if [[ ! -x "${INTERPRETER_UNDER_TEST}" ]]; then
echo "Interpreter must be the command line argument."
exit 4
fi
EXECUTABLE="$0" exec "${INTERPRETER_UNDER_TEST}" -E - <<END_OF_PYTHON
import os
import zipfile
namespace = {}
filename = os.environ['EXECUTABLE']
print(f'Opening {filename} as a zipfile.')
with zipfile.ZipFile(filename, mode='r') as exe_zip:
for file_info in exe_zip.infolist():
data = exe_zip.read(file_info)
exec(data, namespace, namespace)
break # Only use the first file in the archive.
print('Favorite number in executable:', namespace["FAVORITE_NUMBER"])
### Archive contents will be appended after this file. ###
END_OF_PYTHON

View file

@ -0,0 +1,2 @@
# Test data file to be stored within a zip file.
FAVORITE_NUMBER = 5

View file

@ -1,5 +1,5 @@
[tox]
envlist = py{36,37,38,39,3_10,py3}
envlist = py{36,37,38,39,3_10,3_11,py3}
skip_missing_interpreters = True
[testenv]
@ -20,4 +20,5 @@ python =
3.8: py38
3.9: py39
3.10: py3_10
3.11: py3_11
pypy3: pypy3