From b8b7eb8ecd9e012178b5dcec4313edded751a459 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Fri, 23 Dec 2022 11:27:46 +0100 Subject: [PATCH] sync with Python 3.11 --- .github/workflows/test.yml | 2 +- contextlib2/__init__.py | 154 +- contextlib2/__init__.pyi | 184 +- contextlib2/_typeshed.py | 8 +- dev/mypy.allowlist | 3 + test/data/README | 2 + test/support/__init__.py | 2140 ++++++++++++++++- test/support/os_helper.py | 717 +++++- test/test_contextlib.py | 219 +- test/test_contextlib_async.py | 177 +- test/ziptestdata/README.md | 35 + test/ziptestdata/exe_with_z64 | Bin 0 -> 978 bytes test/ziptestdata/exe_with_zip | Bin 0 -> 990 bytes test/ziptestdata/header.sh | 24 + .../ziptestdata/testdata_module_inside_zip.py | 2 + tox.ini | 3 +- 16 files changed, 3501 insertions(+), 169 deletions(-) create mode 100644 test/data/README create mode 100644 test/ziptestdata/README.md create mode 100755 test/ziptestdata/exe_with_z64 create mode 100755 test/ziptestdata/exe_with_zip create mode 100755 test/ziptestdata/header.sh create mode 100644 test/ziptestdata/testdata_module_inside_zip.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b2714f6..ce194d1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,7 +8,7 @@ jobs: strategy: max-parallel: 5 matrix: - python-version: [3.6, 3.7, 3.8, 3.9, '3.10', 'pypy-3.8'] + python-version: [3.6, 3.7, 3.8, 3.9, '3.10', '3.11', 'pypy-3.8'] steps: - uses: actions/checkout@v2 diff --git a/contextlib2/__init__.py b/contextlib2/__init__.py index d6c0c4a..4921e5a 100644 --- a/contextlib2/__init__.py +++ b/contextlib2/__init__.py @@ -1,8 +1,8 @@ """contextlib2 - backports and enhancements to the contextlib module""" import abc +import os import sys -import warnings import _collections_abc from collections import deque from functools import wraps @@ -17,16 +17,17 @@ except ImportError: class GenericAlias: pass - __all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext", "AbstractContextManager", "AbstractAsyncContextManager", "AsyncExitStack", "ContextDecorator", "ExitStack", - "redirect_stdout", "redirect_stderr", "suppress", "aclosing"] + "redirect_stdout", "redirect_stderr", "suppress", "aclosing", + "chdir"] # Backwards compatibility __all__ += ["ContextStack"] class AbstractContextManager(abc.ABC): + """An abstract base class for context managers.""" __class_getitem__ = classmethod(GenericAlias) @@ -143,18 +144,20 @@ class _GeneratorContextManagerBase: # for the class instead. # See http://bugs.python.org/issue19404 for more details. - -class _GeneratorContextManager(_GeneratorContextManagerBase, - AbstractContextManager, - ContextDecorator): - """Helper for @contextmanager decorator.""" - def _recreate_cm(self): - # _GCM instances are one-shot context managers, so the + # _GCMB instances are one-shot context managers, so the # CM must be recreated each time a decorated function is # called return self.__class__(self.func, self.args, self.kwds) + +class _GeneratorContextManager( + _GeneratorContextManagerBase, + AbstractContextManager, + ContextDecorator, +): + """Helper for @contextmanager decorator.""" + def __enter__(self): # do not keep args and kwds alive unnecessarily # they are only needed for recreation, which is not possible anymore @@ -164,8 +167,8 @@ class _GeneratorContextManager(_GeneratorContextManagerBase, except StopIteration: raise RuntimeError("generator didn't yield") from None - def __exit__(self, type, value, traceback): - if type is None: + def __exit__(self, typ, value, traceback): + if typ is None: try: next(self.gen) except StopIteration: @@ -176,9 +179,9 @@ class _GeneratorContextManager(_GeneratorContextManagerBase, if value is None: # Need to force instantiation so we can reliably # tell if we get the same exception back - value = type() + value = typ() try: - self.gen.throw(type, value, traceback) + self.gen.throw(typ, value, traceback) except StopIteration as exc: # Suppress StopIteration *unless* it's the same exception that # was passed to throw(). This prevents a StopIteration @@ -187,43 +190,45 @@ class _GeneratorContextManager(_GeneratorContextManagerBase, except RuntimeError as exc: # Don't re-raise the passed in exception. (issue27122) if exc is value: + exc.__traceback__ = traceback return False - # Likewise, avoid suppressing if a StopIteration exception + # Avoid suppressing if a StopIteration exception # was passed to throw() and later wrapped into a RuntimeError - # (see PEP 479). - if type is StopIteration and exc.__cause__ is value: + # (see PEP 479 for sync generators; async generators also + # have this behavior). But do this only if the exception wrapped + # by the RuntimeError is actually Stop(Async)Iteration (see + # issue29692). + if ( + isinstance(value, StopIteration) + and exc.__cause__ is value + ): + exc.__traceback__ = traceback return False raise - except: + except BaseException as exc: # only re-raise if it's *not* the exception that was # passed to throw(), because __exit__() must not raise # an exception unless __exit__() itself failed. But throw() # has to raise the exception to signal propagation, so this # fixes the impedance mismatch between the throw() protocol # and the __exit__() protocol. - # - # This cannot use 'except BaseException as exc' (as in the - # async implementation) to maintain compatibility with - # Python 2, where old-style class exceptions are not caught - # by 'except BaseException'. - if sys.exc_info()[1] is value: - return False - raise + if exc is not value: + raise + exc.__traceback__ = traceback + return False raise RuntimeError("generator didn't stop after throw()") - -class _AsyncGeneratorContextManager(_GeneratorContextManagerBase, - AbstractAsyncContextManager, - AsyncContextDecorator): - """Helper for @asynccontextmanager.""" - - def _recreate_cm(self): - # _AGCM instances are one-shot context managers, so the - # ACM must be recreated each time a decorated function is - # called - return self.__class__(self.func, self.args, self.kwds) +class _AsyncGeneratorContextManager( + _GeneratorContextManagerBase, + AbstractAsyncContextManager, + AsyncContextDecorator, +): + """Helper for @asynccontextmanager decorator.""" async def __aenter__(self): + # do not keep args and kwds alive unnecessarily + # they are only needed for recreation, which is not possible anymore + del self.args, self.kwds, self.func try: return await self.gen.__anext__() except StopAsyncIteration: @@ -234,35 +239,48 @@ class _AsyncGeneratorContextManager(_GeneratorContextManagerBase, try: await self.gen.__anext__() except StopAsyncIteration: - return + return False else: raise RuntimeError("generator didn't stop") else: if value is None: + # Need to force instantiation so we can reliably + # tell if we get the same exception back value = typ() - # See _GeneratorContextManager.__exit__ for comments on subtleties - # in this implementation try: await self.gen.athrow(typ, value, traceback) - raise RuntimeError("generator didn't stop after athrow()") except StopAsyncIteration as exc: + # Suppress StopIteration *unless* it's the same exception that + # was passed to throw(). This prevents a StopIteration + # raised inside the "with" statement from being suppressed. return exc is not value except RuntimeError as exc: + # Don't re-raise the passed in exception. (issue27122) if exc is value: return False - # Avoid suppressing if a StopIteration exception - # was passed to throw() and later wrapped into a RuntimeError + # Avoid suppressing if a Stop(Async)Iteration exception + # was passed to athrow() and later wrapped into a RuntimeError # (see PEP 479 for sync generators; async generators also # have this behavior). But do this only if the exception wrapped # by the RuntimeError is actually Stop(Async)Iteration (see # issue29692). - if isinstance(value, (StopIteration, StopAsyncIteration)): - if exc.__cause__ is value: - return False + if ( + isinstance(value, (StopIteration, StopAsyncIteration)) + and exc.__cause__ is value + ): + return False raise except BaseException as exc: + # only re-raise if it's *not* the exception that was + # passed to throw(), because __exit__() must not raise + # an exception unless __exit__() itself failed. But throw() + # has to raise the exception to signal propagation, so this + # fixes the impedance mismatch between the throw() protocol + # and the __exit__() protocol. if exc is not value: raise + return False + raise RuntimeError("generator didn't stop after athrow()") def contextmanager(func): @@ -505,9 +523,14 @@ class _BaseExitStack: """ # We look up the special methods on the type to match the with # statement. - _cm_type = type(cm) - _exit = _cm_type.__exit__ - result = _cm_type.__enter__(cm) + cls = type(cm) + try: + _enter = cls.__enter__ + _exit = cls.__exit__ + except AttributeError: + raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " + f"not support the context manager protocol") from None + result = _enter(cm) self._push_cm_exit(cm, _exit) return result @@ -565,10 +588,10 @@ class ExitStack(_BaseExitStack, AbstractContextManager): # Context may not be correct, so find the end of the chain while 1: exc_context = new_exc.__context__ - if exc_context is old_exc: + if exc_context is None or exc_context is old_exc: # Context is already set correctly (see issue 20317) return - if exc_context is None or exc_context is frame_exc: + if exc_context is frame_exc: break new_exc = exc_context # Change the end of the chain to point to the exception @@ -641,9 +664,15 @@ class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager): If successful, also pushes its __aexit__ method as a callback and returns the result of the __aenter__ method. """ - _cm_type = type(cm) - _exit = _cm_type.__aexit__ - result = await _cm_type.__aenter__(cm) + cls = type(cm) + try: + _enter = cls.__aenter__ + _exit = cls.__aexit__ + except AttributeError: + raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " + f"not support the asynchronous context manager protocol" + ) from None + result = await _enter(cm) self._push_async_cm_exit(cm, _exit) return result @@ -708,10 +737,10 @@ class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager): # Context may not be correct, so find the end of the chain while 1: exc_context = new_exc.__context__ - if exc_context is old_exc: + if exc_context is None or exc_context is old_exc: # Context is already set correctly (see issue 20317) return - if exc_context is None or exc_context is frame_exc: + if exc_context is frame_exc: break new_exc = exc_context # Change the end of the chain to point to the exception @@ -779,6 +808,19 @@ class nullcontext(AbstractContextManager, AbstractAsyncContextManager): pass +class chdir(AbstractContextManager): + """Non thread-safe context manager to change the current working directory.""" + + def __init__(self, path): + self.path = path + self._old_cwd = [] + + def __enter__(self): + self._old_cwd.append(os.getcwd()) + os.chdir(self.path) + + def __exit__(self, *excinfo): + os.chdir(self._old_cwd.pop()) # Preserve backwards compatibility class ContextStack(ExitStack): """Backwards compatibility alias for ExitStack""" diff --git a/contextlib2/__init__.pyi b/contextlib2/__init__.pyi index 30ab561..6576fd3 100644 --- a/contextlib2/__init__.pyi +++ b/contextlib2/__init__.pyi @@ -1,24 +1,14 @@ # Type hints copied from the typeshed project under the Apache License 2.0 # https://github.com/python/typeshed/blob/64c85cdd449ccaff90b546676220c9ecfa6e697f/LICENSE +import abc import sys -from ._typeshed import Self +from ._typeshed import Self, StrOrBytesPath +from abc import abstractmethod +from collections.abc import AsyncGenerator, AsyncIterator, Awaitable, Callable, Generator, Iterator from types import TracebackType -from typing import ( - IO, - Any, - AsyncContextManager, - AsyncIterator, - Awaitable, - Callable, - ContextManager, - Iterator, - Optional, - Type, - TypeVar, - overload, -) -from typing_extensions import ParamSpec, Protocol +from typing import IO, Any, Generic, Protocol, TypeVar, overload, runtime_checkable +from typing_extensions import ParamSpec, TypeAlias # contextlib2 API adaptation notes: # * the various 'if True:' guards replace sys.version checks in the original @@ -26,98 +16,137 @@ from typing_extensions import ParamSpec, Protocol # * deliberately omitted APIs are listed in `dev/mypy.allowlist` # (e.g. deprecated experimental APIs that never graduated to the stdlib) -AbstractContextManager = ContextManager -if True: - AbstractAsyncContextManager = AsyncContextManager - _T = TypeVar("_T") _T_co = TypeVar("_T_co", covariant=True) -_T_io = TypeVar("_T_io", bound=Optional[IO[str]]) +_T_io = TypeVar("_T_io", bound=IO[str] | None) _F = TypeVar("_F", bound=Callable[..., Any]) _P = ParamSpec("_P") -_ExitFunc = Callable[[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]], bool] -_CM_EF = TypeVar("_CM_EF", ContextManager[Any], _ExitFunc) +_ExitFunc: TypeAlias = Callable[[type[BaseException] | None, BaseException | None, TracebackType | None], bool | None] +_CM_EF = TypeVar("_CM_EF", bound=AbstractContextManager[Any] | _ExitFunc) -class _GeneratorContextManager(ContextManager[_T_co]): +@runtime_checkable +class AbstractContextManager(Protocol[_T_co]): + def __enter__(self) -> _T_co: ... + @abstractmethod + def __exit__( + self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None + ) -> bool | None: ... + +@runtime_checkable +class AbstractAsyncContextManager(Protocol[_T_co]): + async def __aenter__(self) -> _T_co: ... + @abstractmethod + async def __aexit__( + self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None + ) -> bool | None: ... + +class ContextDecorator: def __call__(self, func: _F) -> _F: ... -# type ignore to deal with incomplete ParamSpec support in mypy -def contextmanager(func: Callable[_P, Iterator[_T]]) -> Callable[_P, _GeneratorContextManager[_T]]: ... # type: ignore +class _GeneratorContextManager(AbstractContextManager[_T_co], ContextDecorator, Generic[_T_co]): + # __init__ and all instance attributes are actually inherited from _GeneratorContextManagerBase + # _GeneratorContextManagerBase is more trouble than it's worth to include in the stub; see #6676 + def __init__(self, func: Callable[..., Iterator[_T_co]], args: tuple[Any, ...], kwds: dict[str, Any]) -> None: ... + gen: Generator[_T_co, Any, Any] + func: Callable[..., Generator[_T_co, Any, Any]] + args: tuple[Any, ...] + kwds: dict[str, Any] + def __exit__( + self, typ: type[BaseException] | None, value: BaseException | None, traceback: TracebackType | None + ) -> bool | None: ... + +def contextmanager(func: Callable[_P, Iterator[_T_co]]) -> Callable[_P, _GeneratorContextManager[_T_co]]: ... if True: - def asynccontextmanager(func: Callable[_P, AsyncIterator[_T]]) -> Callable[_P, AsyncContextManager[_T]]: ... # type: ignore + _AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]]) + + class AsyncContextDecorator: + def __call__(self, func: _AF) -> _AF: ... + + class _AsyncGeneratorContextManager(AbstractAsyncContextManager[_T_co], AsyncContextDecorator, Generic[_T_co]): + # __init__ and these attributes are actually defined in the base class _GeneratorContextManagerBase, + # which is more trouble than it's worth to include in the stub (see #6676) + def __init__(self, func: Callable[..., AsyncIterator[_T_co]], args: tuple[Any, ...], kwds: dict[str, Any]) -> None: ... + gen: AsyncGenerator[_T_co, Any] + func: Callable[..., AsyncGenerator[_T_co, Any]] + args: tuple[Any, ...] + kwds: dict[str, Any] + async def __aexit__( + self, typ: type[BaseException] | None, value: BaseException | None, traceback: TracebackType | None + ) -> bool | None: ... + + +def asynccontextmanager(func: Callable[_P, AsyncIterator[_T_co]]) -> Callable[_P, _AsyncGeneratorContextManager[_T_co]]: ... class _SupportsClose(Protocol): def close(self) -> object: ... _SupportsCloseT = TypeVar("_SupportsCloseT", bound=_SupportsClose) -class closing(ContextManager[_SupportsCloseT]): +class closing(AbstractContextManager[_SupportsCloseT]): def __init__(self, thing: _SupportsCloseT) -> None: ... + def __exit__(self, *exc_info: object) -> None: ... if True: class _SupportsAclose(Protocol): def aclose(self) -> Awaitable[object]: ... _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) - class aclosing(AsyncContextManager[_SupportsAcloseT]): - def __init__(self, thing: _SupportsAcloseT) -> None: ... - _AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]]) - class AsyncContextDecorator: - def __call__(self, func: _AF) -> _AF: ... -class suppress(ContextManager[None]): - def __init__(self, *exceptions: Type[BaseException]) -> None: ... + class aclosing(AbstractAsyncContextManager[_SupportsAcloseT]): + def __init__(self, thing: _SupportsAcloseT) -> None: ... + async def __aexit__(self, *exc_info: object) -> None: ... + +class suppress(AbstractContextManager[None]): + def __init__(self, *exceptions: type[BaseException]) -> None: ... def __exit__( - self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] + self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None ) -> bool: ... -class redirect_stdout(ContextManager[_T_io]): +class _RedirectStream(AbstractContextManager[_T_io]): def __init__(self, new_target: _T_io) -> None: ... + def __exit__( + self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None + ) -> None: ... -class redirect_stderr(ContextManager[_T_io]): - def __init__(self, new_target: _T_io) -> None: ... +class redirect_stdout(_RedirectStream[_T_io]): ... +class redirect_stderr(_RedirectStream[_T_io]): ... -class ContextDecorator: - def __call__(self, func: _F) -> _F: ... - -class ExitStack(ContextManager[ExitStack]): - def __init__(self) -> None: ... - def enter_context(self, cm: ContextManager[_T]) -> _T: ... +# In reality this is a subclass of `AbstractContextManager`; +# see #7961 for why we don't do that in the stub +class ExitStack(metaclass=abc.ABCMeta): + def enter_context(self, cm: AbstractContextManager[_T]) -> _T: ... def push(self, exit: _CM_EF) -> _CM_EF: ... - def callback(self, callback: Callable[..., Any], *args: Any, **kwds: Any) -> Callable[..., Any]: ... + def callback(self, __callback: Callable[_P, _T], *args: _P.args, **kwds: _P.kwargs) -> Callable[_P, _T]: ... def pop_all(self: Self) -> Self: ... def close(self) -> None: ... def __enter__(self: Self) -> Self: ... def __exit__( - self, - __exc_type: Optional[Type[BaseException]], - __exc_value: Optional[BaseException], - __traceback: Optional[TracebackType], + self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None ) -> bool: ... -if True: - _ExitCoroFunc = Callable[[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]], Awaitable[bool]] - _CallbackCoroFunc = Callable[..., Awaitable[Any]] - _ACM_EF = TypeVar("_ACM_EF", AsyncContextManager[Any], _ExitCoroFunc) - class AsyncExitStack(AsyncContextManager[AsyncExitStack]): - def __init__(self) -> None: ... - def enter_context(self, cm: ContextManager[_T]) -> _T: ... - def enter_async_context(self, cm: AsyncContextManager[_T]) -> Awaitable[_T]: ... - def push(self, exit: _CM_EF) -> _CM_EF: ... - def push_async_exit(self, exit: _ACM_EF) -> _ACM_EF: ... - def callback(self, callback: Callable[..., Any], *args: Any, **kwds: Any) -> Callable[..., Any]: ... - def push_async_callback(self, callback: _CallbackCoroFunc, *args: Any, **kwds: Any) -> _CallbackCoroFunc: ... - def pop_all(self: Self) -> Self: ... - def aclose(self) -> Awaitable[None]: ... - def __aenter__(self: Self) -> Awaitable[Self]: ... - def __aexit__( - self, - __exc_type: Optional[Type[BaseException]], - __exc_value: Optional[BaseException], - __traceback: Optional[TracebackType], - ) -> Awaitable[bool]: ... +_ExitCoroFunc: TypeAlias = Callable[ + [type[BaseException] | None, BaseException | None, TracebackType | None], Awaitable[bool | None] +] +_ACM_EF = TypeVar("_ACM_EF", bound=AbstractAsyncContextManager[Any] | _ExitCoroFunc) +# In reality this is a subclass of `AbstractAsyncContextManager`; +# see #7961 for why we don't do that in the stub +class AsyncExitStack(metaclass=abc.ABCMeta): + def enter_context(self, cm: AbstractContextManager[_T]) -> _T: ... + async def enter_async_context(self, cm: AbstractAsyncContextManager[_T]) -> _T: ... + def push(self, exit: _CM_EF) -> _CM_EF: ... + def push_async_exit(self, exit: _ACM_EF) -> _ACM_EF: ... + def callback(self, __callback: Callable[_P, _T], *args: _P.args, **kwds: _P.kwargs) -> Callable[_P, _T]: ... + def push_async_callback( + self, __callback: Callable[_P, Awaitable[_T]], *args: _P.args, **kwds: _P.kwargs + ) -> Callable[_P, Awaitable[_T]]: ... + def pop_all(self: Self) -> Self: ... + async def aclose(self) -> None: ... + async def __aenter__(self: Self) -> Self: ... + async def __aexit__( + self, __exc_type: type[BaseException] | None, __exc_value: BaseException | None, __traceback: TracebackType | None + ) -> bool: ... if True: class nullcontext(AbstractContextManager[_T], AbstractAsyncContextManager[_T]): @@ -127,6 +156,15 @@ if True: @overload def __init__(self: nullcontext[_T], enter_result: _T) -> None: ... def __enter__(self) -> _T: ... - def __exit__(self, *exctype: Any) -> None: ... + def __exit__(self, *exctype: object) -> None: ... async def __aenter__(self) -> _T: ... - async def __aexit__(self, *exctype: Any) -> None: ... + async def __aexit__(self, *exctype: object) -> None: ... + +if True: + _T_fd_or_any_path = TypeVar("_T_fd_or_any_path", bound=int | StrOrBytesPath) + + class chdir(AbstractContextManager[None], Generic[_T_fd_or_any_path]): + path: _T_fd_or_any_path + def __init__(self, path: _T_fd_or_any_path) -> None: ... + def __enter__(self) -> None: ... + def __exit__(self, *excinfo: object) -> None: ... diff --git a/contextlib2/_typeshed.py b/contextlib2/_typeshed.py index 9e065ba..e12992b 100644 --- a/contextlib2/_typeshed.py +++ b/contextlib2/_typeshed.py @@ -1,5 +1,9 @@ -from typing import TypeVar # pragma: no cover +from os import PathLike +from typing import TypeVar +from typing_extensions import TypeAlias # Use for "self" annotations: # def __enter__(self: Self) -> Self: ... -Self = TypeVar("Self") # pragma: no cover +Self = TypeVar("Self") # noqa: Y001 + +StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] # stable diff --git a/dev/mypy.allowlist b/dev/mypy.allowlist index e9e26a9..bc8d472 100644 --- a/dev/mypy.allowlist +++ b/dev/mypy.allowlist @@ -1,3 +1,6 @@ +contextlib2.AbstractContextManager.__class_getitem__ +contextlib2.AbstractAsyncContextManager.__class_getitem__ +contextlib2._typeshed.PathLike.__class_getitem__ # Deprecated APIs that never graduated to the standard library contextlib2.ContextDecorator.refresh_cm contextlib2.ContextStack diff --git a/test/data/README b/test/data/README new file mode 100644 index 0000000..bd05984 --- /dev/null +++ b/test/data/README @@ -0,0 +1,2 @@ +This empty directory serves as destination for temporary files +created by some tests, in particular, the test_codecmaps_* tests. diff --git a/test/support/__init__.py b/test/support/__init__.py index 1e708d1..fa1130d 100644 --- a/test/support/__init__.py +++ b/test/support/__init__.py @@ -1,6 +1,2136 @@ -"""Enough of the test.support APIs to run the contextlib test suite""" -import sys -import unittest +"""Supporting definitions for the Python regression tests.""" -requires_docstrings = unittest.skipIf(sys.flags.optimize >= 2, - "Test requires docstrings") +if __name__ != 'test.support': + raise ImportError('support must be imported from the test package') + +import contextlib +import functools +import getpass +import os +import re +import stat +import sys +import sysconfig +import time +import types +import unittest +import warnings + +try: + from _testcapi import unicode_legacy_string +except ImportError: + unicode_legacy_string = None + +__all__ = [ + # globals + "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", + # exceptions + "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", + # io + "record_original_stdout", "get_original_stdout", "captured_stdout", + "captured_stdin", "captured_stderr", + # unittest + "is_resource_enabled", "requires", "requires_freebsd_version", + "requires_linux_version", "requires_mac_ver", + "check_syntax_error", + "BasicTestRunner", "run_unittest", "run_doctest", + "requires_gzip", "requires_bz2", "requires_lzma", + "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", + "requires_IEEE_754", "requires_zlib", + "has_fork_support", "requires_fork", + "has_subprocess_support", "requires_subprocess", + "has_socket_support", "requires_working_socket", + "anticipate_failure", "load_package_tests", "detect_api_mismatch", + "check__all__", "skip_if_buggy_ucrt_strfptime", + "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer", + # sys + "is_jython", "is_android", "is_emscripten", "is_wasi", + "check_impl_detail", "unix_shell", "setswitchinterval", + # network + "open_urlresource", + # processes + "reap_children", + # miscellaneous + "run_with_locale", "swap_item", "findfile", "infinite_recursion", + "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", + "run_with_tz", "PGO", "missing_compiler_executable", + "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", + "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", + ] + + +# Timeout in seconds for tests using a network server listening on the network +# local loopback interface like 127.0.0.1. +# +# The timeout is long enough to prevent test failure: it takes into account +# that the client and the server can run in different threads or even different +# processes. +# +# The timeout should be long enough for connect(), recv() and send() methods +# of socket.socket. +LOOPBACK_TIMEOUT = 5.0 +if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version: + # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2 + # seconds on Windows ARM32 buildbot + LOOPBACK_TIMEOUT = 10 +elif sys.platform == 'vxworks': + LOOPBACK_TIMEOUT = 10 + +# Timeout in seconds for network requests going to the internet. The timeout is +# short enough to prevent a test to wait for too long if the internet request +# is blocked for whatever reason. +# +# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, +# but skip the test instead: see transient_internet(). +INTERNET_TIMEOUT = 60.0 + +# Timeout in seconds to mark a test as failed if the test takes "too long". +# +# The timeout value depends on the regrtest --timeout command line option. +# +# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use +# LONG_TIMEOUT instead. +SHORT_TIMEOUT = 30.0 + +# Timeout in seconds to detect when a test hangs. +# +# It is long enough to reduce the risk of test failure on the slowest Python +# buildbots. It should not be used to mark a test as failed if the test takes +# "too long". The timeout value depends on the regrtest --timeout command line +# option. +LONG_TIMEOUT = 5 * 60.0 + +# TEST_HOME_DIR refers to the top level directory of the "test" package +# that contains Python's regression test suite +TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) +STDLIB_DIR = os.path.dirname(TEST_HOME_DIR) +REPO_ROOT = os.path.dirname(STDLIB_DIR) + + +class Error(Exception): + """Base class for regression test exceptions.""" + +class TestFailed(Error): + """Test failed.""" + +class TestFailedWithDetails(TestFailed): + """Test failed.""" + def __init__(self, msg, errors, failures): + self.msg = msg + self.errors = errors + self.failures = failures + super().__init__(msg, errors, failures) + + def __str__(self): + return self.msg + +class TestDidNotRun(Error): + """Test did not run any subtests.""" + +class ResourceDenied(unittest.SkipTest): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not be enabled. It is used to distinguish between expected + and unexpected skips. + """ + +def anticipate_failure(condition): + """Decorator to mark a test that is known to be broken in some cases + + Any use of this decorator should have a comment identifying the + associated tracker issue. + """ + if condition: + return unittest.expectedFailure + return lambda f: f + +def load_package_tests(pkg_dir, loader, standard_tests, pattern): + """Generic load_tests implementation for simple test packages. + + Most packages can implement load_tests using this function as follows: + + def load_tests(*args): + return load_package_tests(os.path.dirname(__file__), *args) + """ + if pattern is None: + pattern = "test*" + top_dir = STDLIB_DIR + package_tests = loader.discover(start_dir=pkg_dir, + top_level_dir=top_dir, + pattern=pattern) + standard_tests.addTests(package_tests) + return standard_tests + + +def get_attribute(obj, name): + """Get an attribute, raising SkipTest if AttributeError is raised.""" + try: + attribute = getattr(obj, name) + except AttributeError: + raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) + else: + return attribute + +verbose = 1 # Flag set to 0 by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) +real_max_memuse = 0 +junit_xml_list = None # list of testsuite XML elements +failfast = False + +# _original_stdout is meant to hold stdout at the time regrtest began. +# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. +# The point is to have some flavor of stdout the user can actually see. +_original_stdout = None +def record_original_stdout(stdout): + global _original_stdout + _original_stdout = stdout + +def get_original_stdout(): + return _original_stdout or sys.stdout + + +def _force_run(path, func, *args): + try: + return func(*args) + except FileNotFoundError as err: + # chmod() won't fix a missing file. + if verbose >= 2: + print('%s: %s' % (err.__class__.__name__, err)) + raise + except OSError as err: + if verbose >= 2: + print('%s: %s' % (err.__class__.__name__, err)) + print('re-run %s%r' % (func.__name__, args)) + os.chmod(path, stat.S_IRWXU) + return func(*args) + + +# Check whether a gui is actually available +def _is_gui_available(): + if hasattr(_is_gui_available, 'result'): + return _is_gui_available.result + import platform + reason = None + if sys.platform.startswith('win') and platform.win32_is_iot(): + reason = "gui is not available on Windows IoT Core" + elif sys.platform.startswith('win'): + # if Python is running as a service (such as the buildbot service), + # gui interaction may be disallowed + import ctypes + import ctypes.wintypes + UOI_FLAGS = 1 + WSF_VISIBLE = 0x0001 + class USEROBJECTFLAGS(ctypes.Structure): + _fields_ = [("fInherit", ctypes.wintypes.BOOL), + ("fReserved", ctypes.wintypes.BOOL), + ("dwFlags", ctypes.wintypes.DWORD)] + dll = ctypes.windll.user32 + h = dll.GetProcessWindowStation() + if not h: + raise ctypes.WinError() + uof = USEROBJECTFLAGS() + needed = ctypes.wintypes.DWORD() + res = dll.GetUserObjectInformationW(h, + UOI_FLAGS, + ctypes.byref(uof), + ctypes.sizeof(uof), + ctypes.byref(needed)) + if not res: + raise ctypes.WinError() + if not bool(uof.dwFlags & WSF_VISIBLE): + reason = "gui not available (WSF_VISIBLE flag not set)" + elif sys.platform == 'darwin': + # The Aqua Tk implementations on OS X can abort the process if + # being called in an environment where a window server connection + # cannot be made, for instance when invoked by a buildbot or ssh + # process not running under the same user id as the current console + # user. To avoid that, raise an exception if the window manager + # connection is not available. + from ctypes import cdll, c_int, pointer, Structure + from ctypes.util import find_library + + app_services = cdll.LoadLibrary(find_library("ApplicationServices")) + + if app_services.CGMainDisplayID() == 0: + reason = "gui tests cannot run without OS X window manager" + else: + class ProcessSerialNumber(Structure): + _fields_ = [("highLongOfPSN", c_int), + ("lowLongOfPSN", c_int)] + psn = ProcessSerialNumber() + psn_p = pointer(psn) + if ( (app_services.GetCurrentProcess(psn_p) < 0) or + (app_services.SetFrontProcess(psn_p) < 0) ): + reason = "cannot run without OS X gui process" + + # check on every platform whether tkinter can actually do anything + if not reason: + try: + from tkinter import Tk + root = Tk() + root.withdraw() + root.update() + root.destroy() + except Exception as e: + err_string = str(e) + if len(err_string) > 50: + err_string = err_string[:50] + ' [...]' + reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, + err_string) + + _is_gui_available.reason = reason + _is_gui_available.result = not reason + + return _is_gui_available.result + +def is_resource_enabled(resource): + """Test whether a resource is enabled. + + Known resources are set by regrtest.py. If not running under regrtest.py, + all resources are assumed enabled unless use_resources has been set. + """ + return use_resources is None or resource in use_resources + +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available.""" + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the %r resource not enabled" % resource + raise ResourceDenied(msg) + if resource in {"network", "urlfetch"} and not has_socket_support: + raise ResourceDenied("No socket support") + if resource == 'gui' and not _is_gui_available(): + raise ResourceDenied(_is_gui_available.reason) + +def _requires_unix_version(sysname, min_version): + """Decorator raising SkipTest if the OS is `sysname` and the version is less + than `min_version`. + + For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if + the FreeBSD version is less than 7.2. + """ + import platform + min_version_txt = '.'.join(map(str, min_version)) + version_txt = platform.release().split('-', 1)[0] + if platform.system() == sysname: + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + skip = False + else: + skip = version < min_version + else: + skip = False + + return unittest.skipIf( + skip, + f"{sysname} version {min_version_txt} or higher required, not " + f"{version_txt}" + ) + + +def requires_freebsd_version(*min_version): + """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is + less than `min_version`. + + For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD + version is less than 7.2. + """ + return _requires_unix_version('FreeBSD', min_version) + +def requires_linux_version(*min_version): + """Decorator raising SkipTest if the OS is Linux and the Linux version is + less than `min_version`. + + For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux + version is less than 2.6.32. + """ + return _requires_unix_version('Linux', min_version) + +def requires_mac_ver(*min_version): + """Decorator raising SkipTest if the OS is Mac OS X and the OS X + version if less than min_version. + + For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version + is lesser than 10.5. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if sys.platform == 'darwin': + import platform + version_txt = platform.mac_ver()[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "Mac OS X %s or higher required, not %s" + % (min_version_txt, version_txt)) + return func(*args, **kw) + wrapper.min_version = min_version + return wrapper + return decorator + + +def skip_if_buildbot(reason=None): + """Decorator raising SkipTest if running on a buildbot.""" + if not reason: + reason = 'not suitable for buildbots' + try: + isbuildbot = getpass.getuser().lower() == 'buildbot' + except (KeyError, EnvironmentError) as err: + warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning) + isbuildbot = False + return unittest.skipIf(isbuildbot, reason) + +def check_sanitizer(*, address=False, memory=False, ub=False): + """Returns True if Python is compiled with sanitizer support""" + if not (address or memory or ub): + raise ValueError('At least one of address, memory, or ub must be True') + + + _cflags = sysconfig.get_config_var('CFLAGS') or '' + _config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' + memory_sanitizer = ( + '-fsanitize=memory' in _cflags or + '--with-memory-sanitizer' in _config_args + ) + address_sanitizer = ( + '-fsanitize=address' in _cflags or + '--with-memory-sanitizer' in _config_args + ) + ub_sanitizer = ( + '-fsanitize=undefined' in _cflags or + '--with-undefined-behavior-sanitizer' in _config_args + ) + return ( + (memory and memory_sanitizer) or + (address and address_sanitizer) or + (ub and ub_sanitizer) + ) + + +def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False): + """Decorator raising SkipTest if running with a sanitizer active.""" + if not reason: + reason = 'not working with sanitizers active' + skip = check_sanitizer(address=address, memory=memory, ub=ub) + return unittest.skipIf(skip, reason) + + +def system_must_validate_cert(f): + """Skip the test on TLS certificate validation failures.""" + @functools.wraps(f) + def dec(*args, **kwargs): + try: + f(*args, **kwargs) + except OSError as e: + if "CERTIFICATE_VERIFY_FAILED" in str(e): + raise unittest.SkipTest("system does not contain " + "necessary certificates") + raise + return dec + +# A constant likely larger than the underlying OS pipe buffer size, to +# make writes blocking. +# Windows limit seems to be around 512 B, and many Unix kernels have a +# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. +# (see issue #17835 for a discussion of this number). +PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 + +# A constant likely larger than the underlying OS socket buffer size, to make +# writes blocking. +# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl +# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 +# for a discussion of this number. +SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 + +# decorator for skipping tests on non-IEEE 754 platforms +requires_IEEE_754 = unittest.skipUnless( + float.__getformat__("double").startswith("IEEE"), + "test requires IEEE 754 doubles") + +def requires_zlib(reason='requires zlib'): + try: + import zlib + except ImportError: + zlib = None + return unittest.skipUnless(zlib, reason) + +def requires_gzip(reason='requires gzip'): + try: + import gzip + except ImportError: + gzip = None + return unittest.skipUnless(gzip, reason) + +def requires_bz2(reason='requires bz2'): + try: + import bz2 + except ImportError: + bz2 = None + return unittest.skipUnless(bz2, reason) + +def requires_lzma(reason='requires lzma'): + try: + import lzma + except ImportError: + lzma = None + return unittest.skipUnless(lzma, reason) + +def has_no_debug_ranges(): + try: + import _testinternalcapi + except ImportError: + raise unittest.SkipTest("_testinternalcapi required") + config = _testinternalcapi.get_config() + return not bool(config['code_debug_ranges']) + +def requires_debug_ranges(reason='requires co_positions / debug_ranges'): + return unittest.skipIf(has_no_debug_ranges(), reason) + +requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string, + 'requires legacy Unicode C API') + +is_jython = sys.platform.startswith('java') + +is_android = hasattr(sys, 'getandroidapilevel') + +if sys.platform not in ('win32', 'vxworks'): + unix_shell = '/system/bin/sh' if is_android else '/bin/sh' +else: + unix_shell = None + +# wasm32-emscripten and -wasi are POSIX-like but do not +# have subprocess or fork support. +is_emscripten = sys.platform == "emscripten" +is_wasi = sys.platform == "wasi" + +has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi + +def requires_fork(): + return unittest.skipUnless(has_fork_support, "requires working os.fork()") + +has_subprocess_support = not is_emscripten and not is_wasi + +def requires_subprocess(): + """Used for subprocess, os.spawn calls, fd inheritance""" + return unittest.skipUnless(has_subprocess_support, "requires subprocess support") + +# Emscripten's socket emulation and WASI sockets have limitations. +has_socket_support = not is_emscripten and not is_wasi + +def requires_working_socket(*, module=False): + """Skip tests or modules that require working sockets + + Can be used as a function/class decorator or to skip an entire module. + """ + msg = "requires socket support" + if module: + if not has_socket_support: + raise unittest.SkipTest(msg) + else: + return unittest.skipUnless(has_socket_support, msg) + +# Does strftime() support glibc extension like '%4Y'? +has_strftime_extensions = False +if sys.platform != "win32": + # bpo-47037: Windows debug builds crash with "Debug Assertion Failed" + try: + has_strftime_extensions = time.strftime("%4Y") != "%4Y" + except ValueError: + pass + +# Define the URL of a dedicated HTTP server for the network tests. +# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. +TEST_HTTP_URL = "http://www.pythontest.net" + +# Set by libregrtest/main.py so we can skip tests that are not +# useful for PGO +PGO = False + +# Set by libregrtest/main.py if we are running the extended (time consuming) +# PGO task. If this is True, PGO is also True. +PGO_EXTENDED = False + +# TEST_DATA_DIR is used as a target download location for remote resources +TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") + + +def darwin_malloc_err_warning(test_name): + """Assure user that loud errors generated by macOS libc's malloc are + expected.""" + if sys.platform != 'darwin': + return + + import shutil + msg = ' NOTICE ' + detail = (f'{test_name} may generate "malloc can\'t allocate region"\n' + 'warnings on macOS systems. This behavior is known. Do not\n' + 'report a bug unless tests are also failing. See bpo-40928.') + + padding, _ = shutil.get_terminal_size() + print(msg.center(padding, '-')) + print(detail) + print('-' * padding) + + +def findfile(filename, subdir=None): + """Try to find a file on sys.path or in the test directory. If it is not + found the argument passed to the function is returned (this does not + necessarily signal failure; could still be the legitimate path). + + Setting *subdir* indicates a relative path to use to find the file + rather than looking directly in the path directories. + """ + if os.path.isabs(filename): + return filename + if subdir is not None: + filename = os.path.join(subdir, filename) + path = [TEST_HOME_DIR] + sys.path + for dn in path: + fn = os.path.join(dn, filename) + if os.path.exists(fn): return fn + return filename + + +def sortdict(dict): + "Like repr(dict), but in sorted order." + items = sorted(dict.items()) + reprpairs = ["%r: %r" % pair for pair in items] + withcommas = ", ".join(reprpairs) + return "{%s}" % withcommas + +def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): + with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: + compile(statement, '', 'exec') + err = cm.exception + testcase.assertIsNotNone(err.lineno) + if lineno is not None: + testcase.assertEqual(err.lineno, lineno) + testcase.assertIsNotNone(err.offset) + if offset is not None: + testcase.assertEqual(err.offset, offset) + + +def open_urlresource(url, *args, **kw): + import urllib.request, urllib.parse + from .os_helper import unlink + try: + import gzip + except ImportError: + gzip = None + + check = kw.pop('check', None) + + filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! + + fn = os.path.join(TEST_DATA_DIR, filename) + + def check_valid_file(fn): + f = open(fn, *args, **kw) + if check is None: + return f + elif check(f): + f.seek(0) + return f + f.close() + + if os.path.exists(fn): + f = check_valid_file(fn) + if f is not None: + return f + unlink(fn) + + # Verify the requirement before downloading the file + requires('urlfetch') + + if verbose: + print('\tfetching %s ...' % url, file=get_original_stdout()) + opener = urllib.request.build_opener() + if gzip: + opener.addheaders.append(('Accept-Encoding', 'gzip')) + f = opener.open(url, timeout=INTERNET_TIMEOUT) + if gzip and f.headers.get('Content-Encoding') == 'gzip': + f = gzip.GzipFile(fileobj=f) + try: + with open(fn, "wb") as out: + s = f.read() + while s: + out.write(s) + s = f.read() + finally: + f.close() + + f = check_valid_file(fn) + if f is not None: + return f + raise TestFailed('invalid resource %r' % fn) + + +@contextlib.contextmanager +def captured_output(stream_name): + """Return a context manager used by captured_stdout/stdin/stderr + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + import io + orig_stdout = getattr(sys, stream_name) + setattr(sys, stream_name, io.StringIO()) + try: + yield getattr(sys, stream_name) + finally: + setattr(sys, stream_name, orig_stdout) + +def captured_stdout(): + """Capture the output of sys.stdout: + + with captured_stdout() as stdout: + print("hello") + self.assertEqual(stdout.getvalue(), "hello\\n") + """ + return captured_output("stdout") + +def captured_stderr(): + """Capture the output of sys.stderr: + + with captured_stderr() as stderr: + print("hello", file=sys.stderr) + self.assertEqual(stderr.getvalue(), "hello\\n") + """ + return captured_output("stderr") + +def captured_stdin(): + """Capture the input to sys.stdin: + + with captured_stdin() as stdin: + stdin.write('hello\\n') + stdin.seek(0) + # call test code that consumes from sys.stdin + captured = input() + self.assertEqual(captured, "hello") + """ + return captured_output("stdin") + + +def gc_collect(): + """Force as many objects as possible to be collected. + + In non-CPython implementations of Python, this is needed because timely + deallocation is not guaranteed by the garbage collector. (Even in CPython + this can be the case in case of reference cycles.) This means that __del__ + methods may be called later than expected and weakrefs may remain alive for + longer than expected. This function tries its best to force all garbage + objects to disappear. + """ + import gc + gc.collect() + if is_jython: + time.sleep(0.1) + gc.collect() + gc.collect() + +@contextlib.contextmanager +def disable_gc(): + import gc + have_gc = gc.isenabled() + gc.disable() + try: + yield + finally: + if have_gc: + gc.enable() + + +def python_is_optimized(): + """Find if Python was built with optimizations.""" + cflags = sysconfig.get_config_var('PY_CFLAGS') or '' + final_opt = "" + for opt in cflags.split(): + if opt.startswith('-O'): + final_opt = opt + return final_opt not in ('', '-O0', '-Og') + + +_header = 'nP' +_align = '0n' +if hasattr(sys, "getobjects"): + _header = '2P' + _header + _align = '0P' +_vheader = _header + 'n' + +def calcobjsize(fmt): + import struct + return struct.calcsize(_header + fmt + _align) + +def calcvobjsize(fmt): + import struct + return struct.calcsize(_vheader + fmt + _align) + + +_TPFLAGS_HAVE_GC = 1<<14 +_TPFLAGS_HEAPTYPE = 1<<9 + +def check_sizeof(test, o, size): + try: + import _testinternalcapi + except ImportError: + raise unittest.SkipTest("_testinternalcapi required") + result = sys.getsizeof(o) + # add GC header size + if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ + ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): + size += _testinternalcapi.SIZEOF_PYGC_HEAD + msg = 'wrong size for %s: got %d, expected %d' \ + % (type(o), result, size) + test.assertEqual(result, size, msg) + + +#======================================================================= +# Decorator for running a function in a specific timezone, correctly +# resetting it afterwards. + +def run_with_tz(tz): + def decorator(func): + def inner(*args, **kwds): + try: + tzset = time.tzset + except AttributeError: + raise unittest.SkipTest("tzset required") + if 'TZ' in os.environ: + orig_tz = os.environ['TZ'] + else: + orig_tz = None + os.environ['TZ'] = tz + tzset() + + # now run the function, resetting the tz on exceptions + try: + return func(*args, **kwds) + finally: + if orig_tz is None: + del os.environ['TZ'] + else: + os.environ['TZ'] = orig_tz + time.tzset() + + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use +# should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G +_4G = 4 * _1G + +MAX_Py_ssize_t = sys.maxsize + +def set_memlimit(limit): + global max_memuse + global real_max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + real_max_memuse = memlimit + if memlimit > MAX_Py_ssize_t: + memlimit = MAX_Py_ssize_t + if memlimit < _2G - 1: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +class _MemoryWatchdog: + """An object which periodically watches the process' memory consumption + and prints it out. + """ + + def __init__(self): + self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) + self.started = False + + def start(self): + import warnings + try: + f = open(self.procfile, 'r') + except OSError as e: + warnings.warn('/proc not available for stats: {}'.format(e), + RuntimeWarning) + sys.stderr.flush() + return + + import subprocess + with f: + watchdog_script = findfile("memory_watchdog.py") + self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], + stdin=f, + stderr=subprocess.DEVNULL) + self.started = True + + def stop(self): + if self.started: + self.mem_watchdog.terminate() + self.mem_watchdog.wait() + + +def bigmemtest(size, memuse, dry_run=True): + """Decorator for bigmem tests. + + 'size' is a requested size for the test (in arbitrary, test-interpreted + units.) 'memuse' is the number of bytes per unit for the test, or a good + estimate of it. For example, a test that needs two byte buffers, of 4 GiB + each, could be decorated with @bigmemtest(size=_4G, memuse=2). + + The 'size' argument is normally passed to the decorated test method as an + extra argument. If 'dry_run' is true, the value passed to the test method + may be less than the requested value. If 'dry_run' is false, it means the + test doesn't support dummy runs when -M is not specified. + """ + def decorator(f): + def wrapper(self): + size = wrapper.size + memuse = wrapper.memuse + if not real_max_memuse: + maxsize = 5147 + else: + maxsize = size + + if ((real_max_memuse or not dry_run) + and real_max_memuse < maxsize * memuse): + raise unittest.SkipTest( + "not enough memory: %.1fG minimum needed" + % (size * memuse / (1024 ** 3))) + + if real_max_memuse and verbose: + print() + print(" ... expected peak memory use: {peak:.1f}G" + .format(peak=size * memuse / (1024 ** 3))) + watchdog = _MemoryWatchdog() + watchdog.start() + else: + watchdog = None + + try: + return f(self, maxsize) + finally: + if watchdog: + watchdog.stop() + + wrapper.size = size + wrapper.memuse = memuse + return wrapper + return decorator + +def bigaddrspacetest(f): + """Decorator for tests that fill the address space.""" + def wrapper(self): + if max_memuse < MAX_Py_ssize_t: + if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: + raise unittest.SkipTest( + "not enough memory: try a 32-bit build instead") + else: + raise unittest.SkipTest( + "not enough memory: %.1fG minimum needed" + % (MAX_Py_ssize_t / (1024 ** 3))) + else: + return f(self) + return wrapper + +#======================================================================= +# unittest integration. + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result + +def _id(obj): + return obj + +def requires_resource(resource): + if resource == 'gui' and not _is_gui_available(): + return unittest.skip(_is_gui_available.reason) + if is_resource_enabled(resource): + return _id + else: + return unittest.skip("resource {0!r} is not enabled".format(resource)) + +def cpython_only(test): + """ + Decorator for tests only applicable on CPython. + """ + return impl_detail(cpython=True)(test) + +def impl_detail(msg=None, **guards): + if check_impl_detail(**guards): + return _id + if msg is None: + guardnames, default = _parse_guards(guards) + if default: + msg = "implementation detail not available on {0}" + else: + msg = "implementation detail specific to {0}" + guardnames = sorted(guardnames.keys()) + msg = msg.format(' or '.join(guardnames)) + return unittest.skip(msg) + +def _parse_guards(guards): + # Returns a tuple ({platform_name: run_me}, default_value) + if not guards: + return ({'cpython': True}, False) + is_true = list(guards.values())[0] + assert list(guards.values()) == [is_true] * len(guards) # all True or all False + return (guards, not is_true) + +# Use the following check to guard CPython's implementation-specific tests -- +# or to run them only on the implementation(s) guarded by the arguments. +def check_impl_detail(**guards): + """This function returns True or False depending on the host platform. + Examples: + if check_impl_detail(): # only on CPython (default) + if check_impl_detail(jython=True): # only on Jython + if check_impl_detail(cpython=False): # everywhere except on CPython + """ + guards, default = _parse_guards(guards) + return guards.get(sys.implementation.name, default) + + +def no_tracing(func): + """Decorator to temporarily turn off tracing for the duration of a test.""" + if not hasattr(sys, 'gettrace'): + return func + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + original_trace = sys.gettrace() + try: + sys.settrace(None) + return func(*args, **kwargs) + finally: + sys.settrace(original_trace) + return wrapper + + +def refcount_test(test): + """Decorator for tests which involve reference counting. + + To start, the decorator does not run the test if is not run by CPython. + After that, any trace function is unset during the test to prevent + unexpected refcounts caused by the trace function. + + """ + return no_tracing(cpython_only(test)) + + +def _filter_suite(suite, pred): + """Recursively filter test cases in a suite based on a predicate.""" + newtests = [] + for test in suite._tests: + if isinstance(test, unittest.TestSuite): + _filter_suite(test, pred) + newtests.append(test) + else: + if pred(test): + newtests.append(test) + suite._tests = newtests + + +# By default, don't filter tests +_match_test_func = None + +_accept_test_patterns = None +_ignore_test_patterns = None + + +def match_test(test): + # Function used by support.run_unittest() and regrtest --list-cases + if _match_test_func is None: + return True + else: + return _match_test_func(test.id()) + + +def _is_full_match_test(pattern): + # If a pattern contains at least one dot, it's considered + # as a full test identifier. + # Example: 'test.test_os.FileTests.test_access'. + # + # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' + # or '[!...]'. For example, ignore 'test_access*'. + return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) + + +def set_match_tests(accept_patterns=None, ignore_patterns=None): + global _match_test_func, _accept_test_patterns, _ignore_test_patterns + + + if accept_patterns is None: + accept_patterns = () + if ignore_patterns is None: + ignore_patterns = () + + accept_func = ignore_func = None + + if accept_patterns != _accept_test_patterns: + accept_patterns, accept_func = _compile_match_function(accept_patterns) + if ignore_patterns != _ignore_test_patterns: + ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) + + # Create a copy since patterns can be mutable and so modified later + _accept_test_patterns = tuple(accept_patterns) + _ignore_test_patterns = tuple(ignore_patterns) + + if accept_func is not None or ignore_func is not None: + def match_function(test_id): + accept = True + ignore = False + if accept_func: + accept = accept_func(test_id) + if ignore_func: + ignore = ignore_func(test_id) + return accept and not ignore + + _match_test_func = match_function + + +def _compile_match_function(patterns): + if not patterns: + func = None + # set_match_tests(None) behaves as set_match_tests(()) + patterns = () + elif all(map(_is_full_match_test, patterns)): + # Simple case: all patterns are full test identifier. + # The test.bisect_cmd utility only uses such full test identifiers. + func = set(patterns).__contains__ + else: + import fnmatch + regex = '|'.join(map(fnmatch.translate, patterns)) + # The search *is* case sensitive on purpose: + # don't use flags=re.IGNORECASE + regex_match = re.compile(regex).match + + def match_test_regex(test_id): + if regex_match(test_id): + # The regex matches the whole identifier, for example + # 'test.test_os.FileTests.test_access'. + return True + else: + # Try to match parts of the test identifier. + # For example, split 'test.test_os.FileTests.test_access' + # into: 'test', 'test_os', 'FileTests' and 'test_access'. + return any(map(regex_match, test_id.split("."))) + + func = match_test_regex + + return patterns, func + + +#======================================================================= +# Check for the presence of docstrings. + +# Rather than trying to enumerate all the cases where docstrings may be +# disabled, we just check for that directly + +def _check_docstrings(): + """Just used to check if docstrings are enabled""" + +MISSING_C_DOCSTRINGS = (check_impl_detail() and + sys.platform != 'win32' and + not sysconfig.get_config_var('WITH_DOC_STRINGS')) + +HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and + not MISSING_C_DOCSTRINGS) + +requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, + "test requires docstrings") + + +#======================================================================= +# doctest driver. + +def run_doctest(module, verbosity=None, optionflags=0): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) + if f: + raise TestFailed("%d of %d doctests failed" % (f, t)) + if verbose: + print('doctest (%s) ... %d tests with zero failures' % + (module.__name__, t)) + return f, t + + +#======================================================================= +# Support for saving and restoring the imported modules. + +def flush_std_streams(): + if sys.stdout is not None: + sys.stdout.flush() + if sys.stderr is not None: + sys.stderr.flush() + + +def print_warning(msg): + # bpo-45410: Explicitly flush stdout to keep logs in order + flush_std_streams() + stream = print_warning.orig_stderr + for line in msg.splitlines(): + print(f"Warning -- {line}", file=stream) + stream.flush() + +# bpo-39983: Store the original sys.stderr at Python startup to be able to +# log warnings even if sys.stderr is captured temporarily by a test. +print_warning.orig_stderr = sys.stderr + + +# Flag used by saved_test_environment of test.libregrtest.save_env, +# to check if a test modified the environment. The flag should be set to False +# before running a new test. +# +# For example, threading_helper.threading_cleanup() sets the flag is the function fails +# to cleanup threads. +environment_altered = False + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + global environment_altered + + # Need os.waitpid(-1, os.WNOHANG): Windows is not supported + if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): + return + elif not has_subprocess_support: + return + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + while True: + try: + # Read the exit status of any child process which already completed + pid, status = os.waitpid(-1, os.WNOHANG) + except OSError: + break + + if pid == 0: + break + + print_warning(f"reap_children() reaped child process {pid}") + environment_altered = True + + +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield real_val + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + if hasattr(obj, attr): + delattr(obj, attr) + +@contextlib.contextmanager +def swap_item(obj, item, new_val): + """Temporary swap out an item with a new object. + + Usage: + with swap_item(obj, "item", 5): + ... + + This will set obj["item"] to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `item` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + + The old value (or None if it doesn't exist) will be assigned to the + target of the "as" clause, if there is one. + """ + if item in obj: + real_val = obj[item] + obj[item] = new_val + try: + yield real_val + finally: + obj[item] = real_val + else: + obj[item] = new_val + try: + yield + finally: + if item in obj: + del obj[item] + +def args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags and sys.warnoptions.""" + import subprocess + return subprocess._args_from_interpreter_flags() + +def optim_args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + optimization settings in sys.flags.""" + import subprocess + return subprocess._optim_args_from_interpreter_flags() + + +class Matcher(object): + + _partial_matches = ('msg', 'message') + + def matches(self, d, **kwargs): + """ + Try to match a single dict with the supplied arguments. + + Keys whose values are strings and which are in self._partial_matches + will be checked for partial (i.e. substring) matches. You can extend + this scheme to (for example) do regular expression matching, etc. + """ + result = True + for k in kwargs: + v = kwargs[k] + dv = d.get(k) + if not self.match_value(k, dv, v): + result = False + break + return result + + def match_value(self, k, dv, v): + """ + Try to match a single stored value (dv) with a supplied value (v). + """ + if type(v) != type(dv): + result = False + elif type(dv) is not str or k not in self._partial_matches: + result = (v == dv) + else: + result = dv.find(v) >= 0 + return result + + +_buggy_ucrt = None +def skip_if_buggy_ucrt_strfptime(test): + """ + Skip decorator for tests that use buggy strptime/strftime + + If the UCRT bugs are present time.localtime().tm_zone will be + an empty string, otherwise we assume the UCRT bugs are fixed + + See bpo-37552 [Windows] strptime/strftime return invalid + results with UCRT version 17763.615 + """ + import locale + global _buggy_ucrt + if _buggy_ucrt is None: + if(sys.platform == 'win32' and + locale.getencoding() == 'cp65001' and + time.localtime().tm_zone == ''): + _buggy_ucrt = True + else: + _buggy_ucrt = False + return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test + +class PythonSymlink: + """Creates a symlink for the current Python executable""" + def __init__(self, link=None): + from .os_helper import TESTFN + + self.link = link or os.path.abspath(TESTFN) + self._linked = [] + self.real = os.path.realpath(sys.executable) + self._also_link = [] + + self._env = None + + self._platform_specific() + + if sys.platform == "win32": + def _platform_specific(self): + import glob + import _winapi + + if os.path.lexists(self.real) and not os.path.exists(self.real): + # App symlink appears to not exist, but we want the + # real executable here anyway + self.real = _winapi.GetModuleFileName(0) + + dll = _winapi.GetModuleFileName(sys.dllhandle) + src_dir = os.path.dirname(dll) + dest_dir = os.path.dirname(self.link) + self._also_link.append(( + dll, + os.path.join(dest_dir, os.path.basename(dll)) + )) + for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): + self._also_link.append(( + runtime, + os.path.join(dest_dir, os.path.basename(runtime)) + )) + + self._env = {k.upper(): os.getenv(k) for k in os.environ} + self._env["PYTHONHOME"] = os.path.dirname(self.real) + if sysconfig.is_python_build(): + self._env["PYTHONPATH"] = STDLIB_DIR + else: + def _platform_specific(self): + pass + + def __enter__(self): + os.symlink(self.real, self.link) + self._linked.append(self.link) + for real, link in self._also_link: + os.symlink(real, link) + self._linked.append(link) + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + for link in self._linked: + try: + os.remove(link) + except IOError as ex: + if verbose: + print("failed to clean up {}: {}".format(link, ex)) + + def _call(self, python, args, env, returncode): + import subprocess + cmd = [python, *args] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + r = p.communicate() + if p.returncode != returncode: + if verbose: + print(repr(r[0])) + print(repr(r[1]), file=sys.stderr) + raise RuntimeError( + 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) + return r + + def call_real(self, *args, returncode=0): + return self._call(self.real, args, None, returncode) + + def call_link(self, *args, returncode=0): + return self._call(self.link, args, self._env, returncode) + + +def skip_if_pgo_task(test): + """Skip decorator for tests not run in (non-extended) PGO task""" + ok = not PGO or PGO_EXTENDED + msg = "Not run for (non-extended) PGO task" + return test if ok else unittest.skip(msg)(test) + + +def detect_api_mismatch(ref_api, other_api, *, ignore=()): + """Returns the set of items in ref_api not in other_api, except for a + defined list of items to be ignored in this check. + + By default this skips private attributes beginning with '_' but + includes all magic methods, i.e. those starting and ending in '__'. + """ + missing_items = set(dir(ref_api)) - set(dir(other_api)) + if ignore: + missing_items -= set(ignore) + missing_items = set(m for m in missing_items + if not m.startswith('_') or m.endswith('__')) + return missing_items + + +def check__all__(test_case, module, name_of_module=None, extra=(), + not_exported=()): + """Assert that the __all__ variable of 'module' contains all public names. + + The module's public names (its API) are detected automatically based on + whether they match the public name convention and were defined in + 'module'. + + The 'name_of_module' argument can specify (as a string or tuple thereof) + what module(s) an API could be defined in in order to be detected as a + public API. One case for this is when 'module' imports part of its public + API from other modules, possibly a C backend (like 'csv' and its '_csv'). + + The 'extra' argument can be a set of names that wouldn't otherwise be + automatically detected as "public", like objects without a proper + '__module__' attribute. If provided, it will be added to the + automatically detected ones. + + The 'not_exported' argument can be a set of names that must not be treated + as part of the public API even though their names indicate otherwise. + + Usage: + import bar + import foo + import unittest + from test import support + + class MiscTestCase(unittest.TestCase): + def test__all__(self): + support.check__all__(self, foo) + + class OtherTestCase(unittest.TestCase): + def test__all__(self): + extra = {'BAR_CONST', 'FOO_CONST'} + not_exported = {'baz'} # Undocumented name. + # bar imports part of its API from _bar. + support.check__all__(self, bar, ('bar', '_bar'), + extra=extra, not_exported=not_exported) + + """ + + if name_of_module is None: + name_of_module = (module.__name__, ) + elif isinstance(name_of_module, str): + name_of_module = (name_of_module, ) + + expected = set(extra) + + for name in dir(module): + if name.startswith('_') or name in not_exported: + continue + obj = getattr(module, name) + if (getattr(obj, '__module__', None) in name_of_module or + (not hasattr(obj, '__module__') and + not isinstance(obj, types.ModuleType))): + expected.add(name) + test_case.assertCountEqual(module.__all__, expected) + + +def suppress_msvcrt_asserts(verbose=False): + try: + import msvcrt + except ImportError: + return + + msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS + | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT + | msvcrt.SEM_NOGPFAULTERRORBOX + | msvcrt.SEM_NOOPENFILEERRORBOX) + + # CrtSetReportMode() is only available in debug build + if hasattr(msvcrt, 'CrtSetReportMode'): + for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: + if verbose: + msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) + msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) + else: + msvcrt.CrtSetReportMode(m, 0) + + +class SuppressCrashReport: + """Try to prevent a crash report from popping up. + + On Windows, don't display the Windows Error Reporting dialog. On UNIX, + disable the creation of coredump file. + """ + old_value = None + old_modes = None + + def __enter__(self): + """On Windows, disable Windows Error Reporting dialogs using + SetErrorMode() and CrtSetReportMode(). + + On UNIX, try to save the previous core file size limit, then set + soft limit to 0. + """ + if sys.platform.startswith('win'): + # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx + try: + import msvcrt + except ImportError: + return + + self.old_value = msvcrt.GetErrorMode() + + msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) + + # bpo-23314: Suppress assert dialogs in debug builds. + # CrtSetReportMode() is only available in debug build. + if hasattr(msvcrt, 'CrtSetReportMode'): + self.old_modes = {} + for report_type in [msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT]: + old_mode = msvcrt.CrtSetReportMode(report_type, + msvcrt.CRTDBG_MODE_FILE) + old_file = msvcrt.CrtSetReportFile(report_type, + msvcrt.CRTDBG_FILE_STDERR) + self.old_modes[report_type] = old_mode, old_file + + else: + try: + import resource + self.resource = resource + except ImportError: + self.resource = None + if self.resource is not None: + try: + self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) + self.resource.setrlimit(self.resource.RLIMIT_CORE, + (0, self.old_value[1])) + except (ValueError, OSError): + pass + + if sys.platform == 'darwin': + import subprocess + # Check if the 'Crash Reporter' on OSX was configured + # in 'Developer' mode and warn that it will get triggered + # when it is. + # + # This assumes that this context manager is used in tests + # that might trigger the next manager. + cmd = ['/usr/bin/defaults', 'read', + 'com.apple.CrashReporter', 'DialogType'] + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + with proc: + stdout = proc.communicate()[0] + if stdout.strip() == b'developer': + print("this test triggers the Crash Reporter, " + "that is intentional", end='', flush=True) + + return self + + def __exit__(self, *ignore_exc): + """Restore Windows ErrorMode or core file behavior to initial value.""" + if self.old_value is None: + return + + if sys.platform.startswith('win'): + import msvcrt + msvcrt.SetErrorMode(self.old_value) + + if self.old_modes: + for report_type, (old_mode, old_file) in self.old_modes.items(): + msvcrt.CrtSetReportMode(report_type, old_mode) + msvcrt.CrtSetReportFile(report_type, old_file) + else: + if self.resource is not None: + try: + self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) + except (ValueError, OSError): + pass + + +def patch(test_instance, object_to_patch, attr_name, new_value): + """Override 'object_to_patch'.'attr_name' with 'new_value'. + + Also, add a cleanup procedure to 'test_instance' to restore + 'object_to_patch' value for 'attr_name'. + The 'attr_name' should be a valid attribute for 'object_to_patch'. + + """ + # check that 'attr_name' is a real attribute for 'object_to_patch' + # will raise AttributeError if it does not exist + getattr(object_to_patch, attr_name) + + # keep a copy of the old value + attr_is_local = False + try: + old_value = object_to_patch.__dict__[attr_name] + except (AttributeError, KeyError): + old_value = getattr(object_to_patch, attr_name, None) + else: + attr_is_local = True + + # restore the value when the test is done + def cleanup(): + if attr_is_local: + setattr(object_to_patch, attr_name, old_value) + else: + delattr(object_to_patch, attr_name) + + test_instance.addCleanup(cleanup) + + # actually override the attribute + setattr(object_to_patch, attr_name, new_value) + + +@contextlib.contextmanager +def patch_list(orig): + """Like unittest.mock.patch.dict, but for lists.""" + try: + saved = orig[:] + yield + finally: + orig[:] = saved + + +def run_in_subinterp(code): + """ + Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc + module is enabled. + """ + # Issue #10915, #15751: PyGILState_*() functions don't work with + # sub-interpreters, the tracemalloc module uses these functions internally + try: + import tracemalloc + except ImportError: + pass + else: + if tracemalloc.is_tracing(): + raise unittest.SkipTest("run_in_subinterp() cannot be used " + "if tracemalloc module is tracing " + "memory allocations") + import _testcapi + return _testcapi.run_in_subinterp(code) + + +def check_free_after_iterating(test, iter, cls, args=()): + class A(cls): + def __del__(self): + nonlocal done + done = True + try: + next(it) + except StopIteration: + pass + + done = False + it = iter(A(*args)) + # Issue 26494: Shouldn't crash + test.assertRaises(StopIteration, next, it) + # The sequence should be deallocated just after the end of iterating + gc_collect() + test.assertTrue(done) + + +def missing_compiler_executable(cmd_names=[]): + """Check if the compiler components used to build the interpreter exist. + + Check for the existence of the compiler executables whose names are listed + in 'cmd_names' or all the compiler executables when 'cmd_names' is empty + and return the first missing executable or None when none is found + missing. + + """ + # TODO (PEP 632): alternate check without using distutils + from distutils import ccompiler, sysconfig, spawn, errors + compiler = ccompiler.new_compiler() + sysconfig.customize_compiler(compiler) + if compiler.compiler_type == "msvc": + # MSVC has no executables, so check whether initialization succeeds + try: + compiler.initialize() + except errors.DistutilsPlatformError: + return "msvc" + for name in compiler.executables: + if cmd_names and name not in cmd_names: + continue + cmd = getattr(compiler, name) + if cmd_names: + assert cmd is not None, \ + "the '%s' executable is not configured" % name + elif not cmd: + continue + if spawn.find_executable(cmd[0]) is None: + return cmd[0] + + +_is_android_emulator = None +def setswitchinterval(interval): + # Setting a very low gil interval on the Android emulator causes python + # to hang (issue #26939). + minimum_interval = 1e-5 + if is_android and interval < minimum_interval: + global _is_android_emulator + if _is_android_emulator is None: + import subprocess + _is_android_emulator = (subprocess.check_output( + ['getprop', 'ro.kernel.qemu']).strip() == b'1') + if _is_android_emulator: + interval = minimum_interval + return sys.setswitchinterval(interval) + + +@contextlib.contextmanager +def disable_faulthandler(): + import faulthandler + + # use sys.__stderr__ instead of sys.stderr, since regrtest replaces + # sys.stderr with a StringIO which has no file descriptor when a test + # is run with -W/--verbose3. + fd = sys.__stderr__.fileno() + + is_enabled = faulthandler.is_enabled() + try: + faulthandler.disable() + yield + finally: + if is_enabled: + faulthandler.enable(file=fd, all_threads=True) + + +class SaveSignals: + """ + Save and restore signal handlers. + + This class is only able to save/restore signal handlers registered + by the Python signal module: see bpo-13285 for "external" signal + handlers. + """ + + def __init__(self): + import signal + self.signal = signal + self.signals = signal.valid_signals() + # SIGKILL and SIGSTOP signals cannot be ignored nor caught + for signame in ('SIGKILL', 'SIGSTOP'): + try: + signum = getattr(signal, signame) + except AttributeError: + continue + self.signals.remove(signum) + self.handlers = {} + + def save(self): + for signum in self.signals: + handler = self.signal.getsignal(signum) + if handler is None: + # getsignal() returns None if a signal handler was not + # registered by the Python signal module, + # and the handler is not SIG_DFL nor SIG_IGN. + # + # Ignore the signal: we cannot restore the handler. + continue + self.handlers[signum] = handler + + def restore(self): + for signum, handler in self.handlers.items(): + self.signal.signal(signum, handler) + + +def with_pymalloc(): + import _testcapi + return _testcapi.WITH_PYMALLOC + + +class _ALWAYS_EQ: + """ + Object that is equal to anything. + """ + def __eq__(self, other): + return True + def __ne__(self, other): + return False + +ALWAYS_EQ = _ALWAYS_EQ() + +class _NEVER_EQ: + """ + Object that is not equal to anything. + """ + def __eq__(self, other): + return False + def __ne__(self, other): + return True + def __hash__(self): + return 1 + +NEVER_EQ = _NEVER_EQ() + +@functools.total_ordering +class _LARGEST: + """ + Object that is greater than anything (except itself). + """ + def __eq__(self, other): + return isinstance(other, _LARGEST) + def __lt__(self, other): + return False + +LARGEST = _LARGEST() + +@functools.total_ordering +class _SMALLEST: + """ + Object that is less than anything (except itself). + """ + def __eq__(self, other): + return isinstance(other, _SMALLEST) + def __gt__(self, other): + return False + +SMALLEST = _SMALLEST() + +def maybe_get_event_loop_policy(): + """Return the global event loop policy if one is set, else return None.""" + import asyncio.events + return asyncio.events._event_loop_policy + +# Helpers for testing hashing. +NHASHBITS = sys.hash_info.width # number of bits in hash() result +assert NHASHBITS in (32, 64) + +# Return mean and sdev of number of collisions when tossing nballs balls +# uniformly at random into nbins bins. By definition, the number of +# collisions is the number of balls minus the number of occupied bins at +# the end. +def collision_stats(nbins, nballs): + n, k = nbins, nballs + # prob a bin empty after k trials = (1 - 1/n)**k + # mean # empty is then n * (1 - 1/n)**k + # so mean # occupied is n - n * (1 - 1/n)**k + # so collisions = k - (n - n*(1 - 1/n)**k) + # + # For the variance: + # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = + # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) + # + # Massive cancellation occurs, and, e.g., for a 64-bit hash code + # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and + # error-prone) efforts to rework the naive formulas to avoid those, + # we use the `decimal` module to get plenty of extra precision. + # + # Note: the exact values are straightforward to compute with + # rationals, but in context that's unbearably slow, requiring + # multi-million bit arithmetic. + import decimal + with decimal.localcontext() as ctx: + bits = n.bit_length() * 2 # bits in n**2 + # At least that many bits will likely cancel out. + # Use that many decimal digits instead. + ctx.prec = max(bits, 30) + dn = decimal.Decimal(n) + p1empty = ((dn - 1) / dn) ** k + meanempty = n * p1empty + occupied = n - meanempty + collisions = k - occupied + var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) + return float(collisions), float(var.sqrt()) + + +class catch_unraisable_exception: + """ + Context manager catching unraisable exception using sys.unraisablehook. + + Storing the exception value (cm.unraisable.exc_value) creates a reference + cycle. The reference cycle is broken explicitly when the context manager + exits. + + Storing the object (cm.unraisable.object) can resurrect it if it is set to + an object which is being finalized. Exiting the context manager clears the + stored object. + + Usage: + + with support.catch_unraisable_exception() as cm: + # code creating an "unraisable exception" + ... + + # check the unraisable exception: use cm.unraisable + ... + + # cm.unraisable attribute no longer exists at this point + # (to break a reference cycle) + """ + + def __init__(self): + self.unraisable = None + self._old_hook = None + + def _hook(self, unraisable): + # Storing unraisable.object can resurrect an object which is being + # finalized. Storing unraisable.exc_value creates a reference cycle. + self.unraisable = unraisable + + def __enter__(self): + self._old_hook = sys.unraisablehook + sys.unraisablehook = self._hook + return self + + def __exit__(self, *exc_info): + sys.unraisablehook = self._old_hook + del self.unraisable + + +def wait_process(pid, *, exitcode, timeout=None): + """ + Wait until process pid completes and check that the process exit code is + exitcode. + + Raise an AssertionError if the process exit code is not equal to exitcode. + + If the process runs longer than timeout seconds (LONG_TIMEOUT by default), + kill the process (if signal.SIGKILL is available) and raise an + AssertionError. The timeout feature is not available on Windows. + """ + if os.name != "nt": + import signal + + if timeout is None: + timeout = LONG_TIMEOUT + t0 = time.monotonic() + sleep = 0.001 + max_sleep = 0.1 + while True: + pid2, status = os.waitpid(pid, os.WNOHANG) + if pid2 != 0: + break + # process is still running + + dt = time.monotonic() - t0 + if dt > timeout: + try: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + except OSError: + # Ignore errors like ChildProcessError or PermissionError + pass + + raise AssertionError(f"process {pid} is still running " + f"after {dt:.1f} seconds") + + sleep = min(sleep * 2, max_sleep) + time.sleep(sleep) + else: + # Windows implementation + pid2, status = os.waitpid(pid, 0) + + exitcode2 = os.waitstatus_to_exitcode(status) + if exitcode2 != exitcode: + raise AssertionError(f"process {pid} exited with code {exitcode2}, " + f"but exit code {exitcode} is expected") + + # sanity check: it should not fail in practice + if pid2 != pid: + raise AssertionError(f"pid {pid2} != pid {pid}") + +def check_disallow_instantiation(testcase, tp, *args, **kwds): + """ + Check that given type cannot be instantiated using *args and **kwds. + + See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag. + """ + mod = tp.__module__ + name = tp.__name__ + if mod != 'builtins': + qualname = f"{mod}.{name}" + else: + qualname = f"{name}" + msg = f"cannot create '{re.escape(qualname)}' instances" + testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) + +@contextlib.contextmanager +def infinite_recursion(max_depth=75): + """Set a lower limit for tests that interact with infinite recursions + (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some + debug windows builds, due to not enough functions being inlined the + stack size might not handle the default recursion limit (1000). See + bpo-11105 for details.""" + + original_depth = sys.getrecursionlimit() + try: + sys.setrecursionlimit(max_depth) + yield + finally: + sys.setrecursionlimit(original_depth) + +def ignore_deprecations_from(module: str, *, like: str) -> object: + token = object() + warnings.filterwarnings( + "ignore", + category=DeprecationWarning, + module=module, + message=like + fr"(?#support{id(token)})", + ) + return token + +def clear_ignored_deprecations(*tokens: object) -> None: + if not tokens: + raise ValueError("Provide token or tokens returned by ignore_deprecations_from") + + new_filters = [] + endswith = tuple(rf"(?#support{id(token)})" for token in tokens) + for action, message, category, module, lineno in warnings.filters: + if action == "ignore" and category is DeprecationWarning: + if isinstance(message, re.Pattern): + msg = message.pattern + else: + msg = message or "" + if msg.endswith(endswith): + continue + new_filters.append((action, message, category, module, lineno)) + if warnings.filters != new_filters: + warnings.filters[:] = new_filters + warnings._filters_mutated() + + +# Skip a test if venv with pip is known to not work. +def requires_venv_with_pip(): + # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages) + try: + import zlib + except ImportError: + return unittest.skipIf(True, "venv: ensurepip requires zlib") + + # bpo-26610: pip/pep425tags.py requires ctypes. + # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1). + try: + import ctypes + except ImportError: + ctypes = None + return unittest.skipUnless(ctypes, 'venv: pip requires ctypes') + + +@contextlib.contextmanager +def adjust_int_max_str_digits(max_digits): + """Temporarily change the integer string conversion length limit.""" + current = sys.get_int_max_str_digits() + try: + sys.set_int_max_str_digits(max_digits) + yield + finally: + sys.set_int_max_str_digits(current) diff --git a/test/support/os_helper.py b/test/support/os_helper.py index e6e4a6f..f599cc7 100644 --- a/test/support/os_helper.py +++ b/test/support/os_helper.py @@ -1,4 +1,717 @@ -"""Enough of the test.support.os_helper APIs to run the contextlib test suite""" +import collections.abc +import contextlib +import errno import os +import re +import stat +import sys +import time +import unittest +import warnings -unlink = os.unlink + +# Filename used for testing +if os.name == 'java': + # Jython disallows @ in module names + TESTFN_ASCII = '$test' +else: + TESTFN_ASCII = '@test' + +# Disambiguate TESTFN for parallel testing, while letting it remain a valid +# module name. +TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid()) + +# TESTFN_UNICODE is a non-ascii filename +TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f" +if sys.platform == 'darwin': + # In Mac OS X's VFS API file names are, by definition, canonically + # decomposed Unicode, encoded using UTF-8. See QA1173: + # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html + import unicodedata + TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) + +# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be +# encoded by the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename. +TESTFN_UNENCODABLE = None +if os.name == 'nt': + # skip win32s (0) or Windows 9x/ME (1) + if sys.getwindowsversion().platform >= 2: + # Different kinds of characters from various languages to minimize the + # probability that the whole name is encodable to MBCS (issue #9819) + TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80" + try: + TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding()) + except UnicodeEncodeError: + pass + else: + print('WARNING: The filename %r CAN be encoded by the filesystem ' + 'encoding (%s). Unicode filename tests may not be effective' + % (TESTFN_UNENCODABLE, sys.getfilesystemencoding())) + TESTFN_UNENCODABLE = None +# macOS and Emscripten deny unencodable filenames (invalid utf-8) +elif sys.platform not in {'darwin', 'emscripten', 'wasi'}: + try: + # ascii and utf-8 cannot encode the byte 0xff + b'\xff'.decode(sys.getfilesystemencoding()) + except UnicodeDecodeError: + # 0xff will be encoded using the surrogate character u+DCFF + TESTFN_UNENCODABLE = TESTFN_ASCII \ + + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape') + else: + # File system encoding (eg. ISO-8859-* encodings) can encode + # the byte 0xff. Skip some unicode filename tests. + pass + +# FS_NONASCII: non-ASCII character encodable by os.fsencode(), +# or an empty string if there is no such character. +FS_NONASCII = '' +for character in ( + # First try printable and common characters to have a readable filename. + # For each character, the encoding list are just example of encodings able + # to encode the character (the list is not exhaustive). + + # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 + '\u00E6', + # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 + '\u0130', + # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 + '\u0141', + # U+03C6 (Greek Small Letter Phi): cp1253 + '\u03C6', + # U+041A (Cyrillic Capital Letter Ka): cp1251 + '\u041A', + # U+05D0 (Hebrew Letter Alef): Encodable to cp424 + '\u05D0', + # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic + '\u060C', + # U+062A (Arabic Letter Teh): cp720 + '\u062A', + # U+0E01 (Thai Character Ko Kai): cp874 + '\u0E01', + + # Then try more "special" characters. "special" because they may be + # interpreted or displayed differently depending on the exact locale + # encoding and the font. + + # U+00A0 (No-Break Space) + '\u00A0', + # U+20AC (Euro Sign) + '\u20AC', +): + try: + # If Python is set up to use the legacy 'mbcs' in Windows, + # 'replace' error mode is used, and encode() returns b'?' + # for characters missing in the ANSI codepage + if os.fsdecode(os.fsencode(character)) != character: + raise UnicodeError + except UnicodeError: + pass + else: + FS_NONASCII = character + break + +# Save the initial cwd +SAVEDCWD = os.getcwd() + +# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be +# decoded from the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename (ex: the latin1 encoding can decode any byte +# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks +# to the surrogateescape error handler (PEP 383), but not from the filesystem +# encoding in strict mode. +TESTFN_UNDECODABLE = None +for name in ( + # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows + # accepts it to create a file or a directory, or don't accept to enter to + # such directory (when the bytes name is used). So test b'\xe7' first: + # it is not decodable from cp932. + b'\xe7w\xf0', + # undecodable from ASCII, UTF-8 + b'\xff', + # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 + # and cp857 + b'\xae\xd5' + # undecodable from UTF-8 (UNIX and Mac OS X) + b'\xed\xb2\x80', b'\xed\xb4\x80', + # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, + # cp1253, cp1254, cp1255, cp1257, cp1258 + b'\x81\x98', +): + try: + name.decode(sys.getfilesystemencoding()) + except UnicodeDecodeError: + TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name + break + +if FS_NONASCII: + TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII +else: + TESTFN_NONASCII = None +TESTFN = TESTFN_NONASCII or TESTFN_ASCII + + +def make_bad_fd(): + """ + Create an invalid file descriptor by opening and closing a file and return + its fd. + """ + file = open(TESTFN, "wb") + try: + return file.fileno() + finally: + file.close() + unlink(TESTFN) + + +_can_symlink = None + + +def can_symlink(): + global _can_symlink + if _can_symlink is not None: + return _can_symlink + # WASI / wasmtime prevents symlinks with absolute paths, see man + # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute + # paths. Skip symlink tests on WASI for now. + src = os.path.abspath(TESTFN) + symlink_path = src + "can_symlink" + try: + os.symlink(src, symlink_path) + can = True + except (OSError, NotImplementedError, AttributeError): + can = False + else: + os.remove(symlink_path) + _can_symlink = can + return can + + +def skip_unless_symlink(test): + """Skip decorator for tests that require functional symlink""" + ok = can_symlink() + msg = "Requires functional symlink implementation" + return test if ok else unittest.skip(msg)(test) + + +_can_xattr = None + + +def can_xattr(): + import tempfile + global _can_xattr + if _can_xattr is not None: + return _can_xattr + if not hasattr(os, "setxattr"): + can = False + else: + import platform + tmp_dir = tempfile.mkdtemp() + tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) + try: + with open(TESTFN, "wb") as fp: + try: + # TESTFN & tempfile may use different file systems with + # different capabilities + os.setxattr(tmp_fp, b"user.test", b"") + os.setxattr(tmp_name, b"trusted.foo", b"42") + os.setxattr(fp.fileno(), b"user.test", b"") + # Kernels < 2.6.39 don't respect setxattr flags. + kernel_version = platform.release() + m = re.match(r"2.6.(\d{1,2})", kernel_version) + can = m is None or int(m.group(1)) >= 39 + except OSError: + can = False + finally: + unlink(TESTFN) + unlink(tmp_name) + rmdir(tmp_dir) + _can_xattr = can + return can + + +def skip_unless_xattr(test): + """Skip decorator for tests that require functional extended attributes""" + ok = can_xattr() + msg = "no non-broken extended attribute support" + return test if ok else unittest.skip(msg)(test) + + +_can_chmod = None + +def can_chmod(): + global _can_chmod + if _can_chmod is not None: + return _can_chmod + if not hasattr(os, "chown"): + _can_chmod = False + return _can_chmod + try: + with open(TESTFN, "wb") as f: + try: + os.chmod(TESTFN, 0o777) + mode1 = os.stat(TESTFN).st_mode + os.chmod(TESTFN, 0o666) + mode2 = os.stat(TESTFN).st_mode + except OSError as e: + can = False + else: + can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2) + finally: + unlink(TESTFN) + _can_chmod = can + return can + + +def skip_unless_working_chmod(test): + """Skip tests that require working os.chmod() + + WASI SDK 15.0 cannot change file mode bits. + """ + ok = can_chmod() + msg = "requires working os.chmod()" + return test if ok else unittest.skip(msg)(test) + + +# Check whether the current effective user has the capability to override +# DAC (discretionary access control). Typically user root is able to +# bypass file read, write, and execute permission checks. The capability +# is independent of the effective user. See capabilities(7). +_can_dac_override = None + +def can_dac_override(): + global _can_dac_override + + if not can_chmod(): + _can_dac_override = False + if _can_dac_override is not None: + return _can_dac_override + + try: + with open(TESTFN, "wb") as f: + os.chmod(TESTFN, 0o400) + try: + with open(TESTFN, "wb"): + pass + except OSError: + _can_dac_override = False + else: + _can_dac_override = True + finally: + unlink(TESTFN) + + return _can_dac_override + + +def skip_if_dac_override(test): + ok = not can_dac_override() + msg = "incompatible with CAP_DAC_OVERRIDE" + return test if ok else unittest.skip(msg)(test) + + +def skip_unless_dac_override(test): + ok = can_dac_override() + msg = "requires CAP_DAC_OVERRIDE" + return test if ok else unittest.skip(msg)(test) + + +def unlink(filename): + try: + _unlink(filename) + except (FileNotFoundError, NotADirectoryError): + pass + + +if sys.platform.startswith("win"): + def _waitfor(func, pathname, waitall=False): + # Perform the operation + func(pathname) + # Now setup the wait loop + if waitall: + dirname = pathname + else: + dirname, name = os.path.split(pathname) + dirname = dirname or '.' + # Check for `pathname` to be removed from the filesystem. + # The exponential backoff of the timeout amounts to a total + # of ~1 second after which the deletion is probably an error + # anyway. + # Testing on an i7@4.3GHz shows that usually only 1 iteration is + # required when contention occurs. + timeout = 0.001 + while timeout < 1.0: + # Note we are only testing for the existence of the file(s) in + # the contents of the directory regardless of any security or + # access rights. If we have made it this far, we have sufficient + # permissions to do that much using Python's equivalent of the + # Windows API FindFirstFile. + # Other Windows APIs can fail or give incorrect results when + # dealing with files that are pending deletion. + L = os.listdir(dirname) + if not (L if waitall else name in L): + return + # Increase the timeout and try again + time.sleep(timeout) + timeout *= 2 + warnings.warn('tests may fail, delete still pending for ' + pathname, + RuntimeWarning, stacklevel=4) + + def _unlink(filename): + _waitfor(os.unlink, filename) + + def _rmdir(dirname): + _waitfor(os.rmdir, dirname) + + def _rmtree(path): + from test.support import _force_run + + def _rmtree_inner(path): + for name in _force_run(path, os.listdir, path): + fullname = os.path.join(path, name) + try: + mode = os.lstat(fullname).st_mode + except OSError as exc: + print("support.rmtree(): os.lstat(%r) failed with %s" + % (fullname, exc), + file=sys.__stderr__) + mode = 0 + if stat.S_ISDIR(mode): + _waitfor(_rmtree_inner, fullname, waitall=True) + _force_run(fullname, os.rmdir, fullname) + else: + _force_run(fullname, os.unlink, fullname) + _waitfor(_rmtree_inner, path, waitall=True) + _waitfor(lambda p: _force_run(p, os.rmdir, p), path) + + def _longpath(path): + try: + import ctypes + except ImportError: + # No ctypes means we can't expands paths. + pass + else: + buffer = ctypes.create_unicode_buffer(len(path) * 2) + length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, + len(buffer)) + if length: + return buffer[:length] + return path +else: + _unlink = os.unlink + _rmdir = os.rmdir + + def _rmtree(path): + import shutil + try: + shutil.rmtree(path) + return + except OSError: + pass + + def _rmtree_inner(path): + from test.support import _force_run + for name in _force_run(path, os.listdir, path): + fullname = os.path.join(path, name) + try: + mode = os.lstat(fullname).st_mode + except OSError: + mode = 0 + if stat.S_ISDIR(mode): + _rmtree_inner(fullname) + _force_run(path, os.rmdir, fullname) + else: + _force_run(path, os.unlink, fullname) + _rmtree_inner(path) + os.rmdir(path) + + def _longpath(path): + return path + + +def rmdir(dirname): + try: + _rmdir(dirname) + except FileNotFoundError: + pass + + +def rmtree(path): + try: + _rmtree(path) + except FileNotFoundError: + pass + + +@contextlib.contextmanager +def temp_dir(path=None, quiet=False): + """Return a context manager that creates a temporary directory. + + Arguments: + + path: the directory to create temporarily. If omitted or None, + defaults to creating a temporary directory using tempfile.mkdtemp. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, if the path is specified and cannot be + created, only a warning is issued. + + """ + import tempfile + dir_created = False + if path is None: + path = tempfile.mkdtemp() + dir_created = True + path = os.path.realpath(path) + else: + try: + os.mkdir(path) + dir_created = True + except OSError as exc: + if not quiet: + raise + warnings.warn(f'tests may fail, unable to create ' + f'temporary directory {path!r}: {exc}', + RuntimeWarning, stacklevel=3) + if dir_created: + pid = os.getpid() + try: + yield path + finally: + # In case the process forks, let only the parent remove the + # directory. The child has a different process id. (bpo-30028) + if dir_created and pid == os.getpid(): + rmtree(path) + + +@contextlib.contextmanager +def change_cwd(path, quiet=False): + """Return a context manager that changes the current working directory. + + Arguments: + + path: the directory to use as the temporary current working directory. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, it issues only a warning and keeps the current + working directory the same. + + """ + saved_dir = os.getcwd() + try: + os.chdir(os.path.realpath(path)) + except OSError as exc: + if not quiet: + raise + warnings.warn(f'tests may fail, unable to change the current working ' + f'directory to {path!r}: {exc}', + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + + +@contextlib.contextmanager +def temp_cwd(name='tempcwd', quiet=False): + """ + Context manager that temporarily creates and changes the CWD. + + The function temporarily changes the current working directory + after creating a temporary directory in the current directory with + name *name*. If *name* is None, the temporary directory is + created using tempfile.mkdtemp. + + If *quiet* is False (default) and it is not possible to + create or change the CWD, an error is raised. If *quiet* is True, + only a warning is raised and the original CWD is used. + + """ + with temp_dir(path=name, quiet=quiet) as temp_path: + with change_cwd(temp_path, quiet=quiet) as cwd_dir: + yield cwd_dir + + +def create_empty_file(filename): + """Create an empty file. If the file already exists, truncate it.""" + fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + os.close(fd) + + +@contextlib.contextmanager +def open_dir_fd(path): + """Open a file descriptor to a directory.""" + assert os.path.isdir(path) + flags = os.O_RDONLY + if hasattr(os, "O_DIRECTORY"): + flags |= os.O_DIRECTORY + dir_fd = os.open(path, flags) + try: + yield dir_fd + finally: + os.close(dir_fd) + + +def fs_is_case_insensitive(directory): + """Detects if the file system for the specified directory + is case-insensitive.""" + import tempfile + with tempfile.NamedTemporaryFile(dir=directory) as base: + base_path = base.name + case_path = base_path.upper() + if case_path == base_path: + case_path = base_path.lower() + try: + return os.path.samefile(base_path, case_path) + except FileNotFoundError: + return False + + +class FakePath: + """Simple implementing of the path protocol. + """ + def __init__(self, path): + self.path = path + + def __repr__(self): + return f'' + + def __fspath__(self): + if (isinstance(self.path, BaseException) or + isinstance(self.path, type) and + issubclass(self.path, BaseException)): + raise self.path + else: + return self.path + + +def fd_count(): + """Count the number of open file descriptors. + """ + if sys.platform.startswith(('linux', 'freebsd', 'emscripten')): + try: + names = os.listdir("/proc/self/fd") + # Subtract one because listdir() internally opens a file + # descriptor to list the content of the /proc/self/fd/ directory. + return len(names) - 1 + except FileNotFoundError: + pass + + MAXFD = 256 + if hasattr(os, 'sysconf'): + try: + MAXFD = os.sysconf("SC_OPEN_MAX") + except OSError: + pass + + old_modes = None + if sys.platform == 'win32': + # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process + # on invalid file descriptor if Python is compiled in debug mode + try: + import msvcrt + msvcrt.CrtSetReportMode + except (AttributeError, ImportError): + # no msvcrt or a release build + pass + else: + old_modes = {} + for report_type in (msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT): + old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, + 0) + + try: + count = 0 + for fd in range(MAXFD): + try: + # Prefer dup() over fstat(). fstat() can require input/output + # whereas dup() doesn't. + fd2 = os.dup(fd) + except OSError as e: + if e.errno != errno.EBADF: + raise + else: + os.close(fd2) + count += 1 + finally: + if old_modes is not None: + for report_type in (msvcrt.CRT_WARN, + msvcrt.CRT_ERROR, + msvcrt.CRT_ASSERT): + msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) + + return count + + +if hasattr(os, "umask"): + @contextlib.contextmanager + def temp_umask(umask): + """Context manager that temporarily sets the process umask.""" + oldmask = os.umask(umask) + try: + yield + finally: + os.umask(oldmask) +else: + @contextlib.contextmanager + def temp_umask(umask): + """no-op on platforms without umask()""" + yield + + +class EnvironmentVarGuard(collections.abc.MutableMapping): + + """Class to help protect the environment variable properly. Can be used as + a context manager.""" + + def __init__(self): + self._environ = os.environ + self._changed = {} + + def __getitem__(self, envvar): + return self._environ[envvar] + + def __setitem__(self, envvar, value): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + self._environ[envvar] = value + + def __delitem__(self, envvar): + # Remember the initial value on the first access + if envvar not in self._changed: + self._changed[envvar] = self._environ.get(envvar) + if envvar in self._environ: + del self._environ[envvar] + + def keys(self): + return self._environ.keys() + + def __iter__(self): + return iter(self._environ) + + def __len__(self): + return len(self._environ) + + def set(self, envvar, value): + self[envvar] = value + + def unset(self, envvar): + del self[envvar] + + def copy(self): + # We do what os.environ.copy() does. + return dict(self) + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + for (k, v) in self._changed.items(): + if v is None: + if k in self._environ: + del self._environ[k] + else: + self._environ[k] = v + os.environ = self._environ diff --git a/test/test_contextlib.py b/test/test_contextlib.py index f09090a..7a91107 100644 --- a/test/test_contextlib.py +++ b/test/test_contextlib.py @@ -1,15 +1,16 @@ """Unit tests for contextlib.py, and other context managers.""" import io +import os import sys import tempfile import threading +import traceback import unittest from contextlib2 import * # Tests __all__ from test import support from test.support import os_helper import weakref -import gc class TestAbstractContextManager(unittest.TestCase): @@ -87,6 +88,32 @@ class ContextManagerTestCase(unittest.TestCase): raise ZeroDivisionError() self.assertEqual(state, [1, 42, 999]) + def test_contextmanager_traceback(self): + @contextmanager + def f(): + yield + + try: + with f(): + 1/0 + except ZeroDivisionError as e: + frames = traceback.extract_tb(e.__traceback__) + + self.assertEqual(len(frames), 1) + self.assertEqual(frames[0].name, 'test_contextmanager_traceback') + self.assertEqual(frames[0].line, '1/0') + + # Repeat with RuntimeError (which goes through a different code path) + try: + with f(): + raise NotImplementedError(42) + except NotImplementedError as e: + frames = traceback.extract_tb(e.__traceback__) + + self.assertEqual(len(frames), 1) + self.assertEqual(frames[0].name, 'test_contextmanager_traceback') + self.assertEqual(frames[0].line, 'raise NotImplementedError(42)') + def test_contextmanager_no_reraise(self): @contextmanager def whee(): @@ -127,19 +154,22 @@ class ContextManagerTestCase(unittest.TestCase): self.assertEqual(state, [1, 42, 999]) def test_contextmanager_except_stopiter(self): - stop_exc = StopIteration('spam') @contextmanager def woohoo(): yield - try: - with self.assertWarnsRegex(DeprecationWarning, - "StopIteration"): - with woohoo(): - raise stop_exc - except Exception as ex: - self.assertIs(ex, stop_exc) - else: - self.fail('StopIteration was suppressed') + + class StopIterationSubclass(StopIteration): + pass + + for stop_exc in (StopIteration('spam'), StopIterationSubclass('spam')): + with self.subTest(type=type(stop_exc)): + try: + with woohoo(): + raise stop_exc + except Exception as ex: + self.assertIs(ex, stop_exc) + else: + self.fail(f'{stop_exc} was suppressed') def test_contextmanager_except_pep479(self): code = """\ @@ -230,7 +260,7 @@ def woohoo(): a = weakref.ref(a) b = weakref.ref(b) # Allow test to work with a non-refcounted GC - gc.collect(); gc.collect(); gc.collect() + support.gc_collect() self.assertIsNone(a()) self.assertIsNone(b()) yield @@ -493,10 +523,14 @@ class TestContextDecorator(unittest.TestCase): pass def __exit__(self, *exc): pass - - with self.assertRaises(AttributeError): - with mycontext(): - pass + if sys.version_info >= (3, 11): + with self.assertRaisesRegex(TypeError, 'the context manager'): + with mycontext(): + pass + else: + with self.assertRaises(AttributeError): + with mycontext(): + pass def test_typo_exit(self): @@ -506,9 +540,15 @@ class TestContextDecorator(unittest.TestCase): def __uxit__(self, *exc): pass - with self.assertRaises(AttributeError): - with mycontext(): - pass + if sys.version_info >= (3, 11): + with self.assertRaisesRegex(TypeError, 'the context manager.*__exit__'): + with mycontext(): + pass + else: + with self.assertRaisesRegex(AttributeError, '__exit__'): + with mycontext(): + pass + def test_contextdecorator_as_mixin(self): @@ -664,6 +704,25 @@ class TestBaseExitStack: result.append(2) self.assertEqual(result, [1, 2, 3, 4]) + def test_enter_context_errors(self): + class LacksEnterAndExit: + pass + class LacksEnter: + def __exit__(self, *exc_info): + pass + class LacksExit: + def __enter__(self): + pass + + with self.exit_stack() as stack: + with self.assertRaisesRegex(TypeError, 'the context manager'): + stack.enter_context(LacksEnterAndExit()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + stack.enter_context(LacksEnter()) + with self.assertRaisesRegex(TypeError, 'the context manager'): + stack.enter_context(LacksExit()) + self.assertFalse(stack._exit_callbacks) + def test_close(self): result = [] with self.exit_stack() as stack: @@ -699,6 +758,38 @@ class TestBaseExitStack: stack.push(lambda *exc: True) 1/0 + def test_exit_exception_traceback(self): + # This test captures the current behavior of ExitStack so that we know + # if we ever unintendedly change it. It is not a statement of what the + # desired behavior is (for instance, we may want to remove some of the + # internal contextlib frames). + + def raise_exc(exc): + raise exc + + try: + with self.exit_stack() as stack: + stack.callback(raise_exc, ValueError) + 1/0 + except ValueError as e: + exc = e + + self.assertIsInstance(exc, ValueError) + ve_frames = traceback.extract_tb(exc.__traceback__) + expected = \ + [('test_exit_exception_traceback', 'with self.exit_stack() as stack:')] + \ + self.callback_error_internal_frames + \ + [('_exit_wrapper', 'callback(*args, **kwds)'), + ('raise_exc', 'raise exc')] + + self.assertEqual( + [(f.name, f.line) for f in ve_frames], expected) + + self.assertIsInstance(exc.__context__, ZeroDivisionError) + zde_frames = traceback.extract_tb(exc.__context__.__traceback__) + self.assertEqual([(f.name, f.line) for f in zde_frames], + [('test_exit_exception_traceback', '1/0')]) + def test_exit_exception_chaining_reference(self): # Sanity check to make sure that ExitStack chaining matches # actual nested with statements @@ -778,6 +869,40 @@ class TestBaseExitStack: self.assertIsInstance(inner_exc, ValueError) self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) + def test_exit_exception_explicit_none_context(self): + # Ensure ExitStack chaining matches actual nested `with` statements + # regarding explicit __context__ = None. + + class MyException(Exception): + pass + + @contextmanager + def my_cm(): + try: + yield + except BaseException: + exc = MyException() + try: + raise exc + finally: + exc.__context__ = None + + @contextmanager + def my_cm_with_exit_stack(): + with self.exit_stack() as stack: + stack.enter_context(my_cm()) + yield stack + + for cm in (my_cm, my_cm_with_exit_stack): + with self.subTest(): + try: + with cm(): + raise IndexError() + except MyException as exc: + self.assertIsNone(exc.__context__) + else: + self.fail("Expected IndexError, but no exception was raised") + def test_exit_exception_non_suppressing(self): # http://bugs.python.org/issue19092 def raise_exc(exc): @@ -889,9 +1014,11 @@ class TestBaseExitStack: def test_instance_bypass(self): class Example(object): pass cm = Example() + cm.__enter__ = object() cm.__exit__ = object() stack = self.exit_stack() - self.assertRaises(AttributeError, stack.enter_context, cm) + with self.assertRaisesRegex(TypeError, 'the context manager'): + stack.enter_context(cm) stack.push(cm) self.assertIs(stack._exit_callbacks[-1][1], cm) @@ -932,6 +1059,10 @@ class TestBaseExitStack: class TestExitStack(TestBaseExitStack, unittest.TestCase): exit_stack = ExitStack + callback_error_internal_frames = [ + ('__exit__', 'raise exc_details[1]'), + ('__exit__', 'if cb(*exc_details):'), + ] class TestRedirectStream: @@ -1057,5 +1188,53 @@ class TestSuppress(unittest.TestCase): 1/0 self.assertTrue(outer_continued) + +class TestChdir(unittest.TestCase): + def make_relative_path(self, *parts): + return os.path.join( + os.path.dirname(os.path.realpath(__file__)), + *parts, + ) + + def test_simple(self): + old_cwd = os.getcwd() + target = self.make_relative_path('data') + self.assertNotEqual(old_cwd, target) + + with chdir(target): + self.assertEqual(os.getcwd(), target) + self.assertEqual(os.getcwd(), old_cwd) + + def test_reentrant(self): + old_cwd = os.getcwd() + target1 = self.make_relative_path('data') + target2 = self.make_relative_path('ziptestdata') + self.assertNotIn(old_cwd, (target1, target2)) + chdir1, chdir2 = chdir(target1), chdir(target2) + + with chdir1: + self.assertEqual(os.getcwd(), target1) + with chdir2: + self.assertEqual(os.getcwd(), target2) + with chdir1: + self.assertEqual(os.getcwd(), target1) + self.assertEqual(os.getcwd(), target2) + self.assertEqual(os.getcwd(), target1) + self.assertEqual(os.getcwd(), old_cwd) + + def test_exception(self): + old_cwd = os.getcwd() + target = self.make_relative_path('data') + self.assertNotEqual(old_cwd, target) + + try: + with chdir(target): + self.assertEqual(os.getcwd(), target) + raise RuntimeError("boom") + except RuntimeError as re: + self.assertEqual(str(re), "boom") + self.assertEqual(os.getcwd(), old_cwd) + + if __name__ == "__main__": unittest.main() diff --git a/test/test_contextlib_async.py b/test/test_contextlib_async.py index eb2ed72..7475c51 100644 --- a/test/test_contextlib_async.py +++ b/test/test_contextlib_async.py @@ -8,21 +8,19 @@ import unittest from test.test_contextlib import TestBaseExitStack +support.requires_working_socket(module=True) def _async_test(func): """Decorator to turn an async function into a test case.""" @functools.wraps(func) def wrapper(*args, **kwargs): coro = func(*args, **kwargs) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(coro) - finally: - loop.close() - asyncio.set_event_loop_policy(None) + asyncio.run(coro) return wrapper +def tearDownModule(): + asyncio.set_event_loop_policy(None) + class TestAbstractAsyncContextManager(unittest.TestCase): @@ -209,7 +207,18 @@ class AsyncContextManagerTestCase(unittest.TestCase): async def woohoo(): yield - for stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')): + class StopIterationSubclass(StopIteration): + pass + + class StopAsyncIterationSubclass(StopAsyncIteration): + pass + + for stop_exc in ( + StopIteration('spam'), + StopAsyncIteration('ham'), + StopIterationSubclass('spam'), + StopAsyncIterationSubclass('spam') + ): with self.subTest(type=type(stop_exc)): try: async with woohoo(): @@ -307,6 +316,82 @@ class AsyncContextManagerTestCase(unittest.TestCase): self.assertEqual(ncols, 10) self.assertEqual(depth, 0) + @_async_test + async def test_decorator(self): + entered = False + + @asynccontextmanager + async def context(): + nonlocal entered + entered = True + yield + entered = False + + @context() + async def test(): + self.assertTrue(entered) + + self.assertFalse(entered) + await test() + self.assertFalse(entered) + + @_async_test + async def test_decorator_with_exception(self): + entered = False + + @asynccontextmanager + async def context(): + nonlocal entered + try: + entered = True + yield + finally: + entered = False + + @context() + async def test(): + self.assertTrue(entered) + raise NameError('foo') + + self.assertFalse(entered) + with self.assertRaisesRegex(NameError, 'foo'): + await test() + self.assertFalse(entered) + + @_async_test + async def test_decorating_method(self): + + @asynccontextmanager + async def context(): + yield + + + class Test(object): + + @context() + async def method(self, a, b, c=None): + self.a = a + self.b = b + self.c = c + + # these tests are for argument passing when used as a decorator + test = Test() + await test.method(1, 2) + self.assertEqual(test.a, 1) + self.assertEqual(test.b, 2) + self.assertEqual(test.c, None) + + test = Test() + await test.method('a', 'b', 'c') + self.assertEqual(test.a, 'a') + self.assertEqual(test.b, 'b') + self.assertEqual(test.c, 'c') + + test = Test() + await test.method(a=1, b=2) + self.assertEqual(test.a, 1) + self.assertEqual(test.b, 2) + class AclosingTestCase(unittest.TestCase): @@ -399,6 +484,13 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase): return self.run_coroutine(self.__aexit__(*exc_details)) exit_stack = SyncAsyncExitStack + callback_error_internal_frames = [ + ('__exit__', 'return self.run_coroutine(self.__aexit__(*exc_details))'), + ('run_coroutine', 'raise exc'), + ('run_coroutine', 'raise exc'), + ('__aexit__', 'raise exc_details[1]'), + ('__aexit__', 'cb_suppress = cb(*exc_details)'), + ] def setUp(self): self.loop = asyncio.new_event_loop() @@ -486,7 +578,7 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase): 1/0 @_async_test - async def test_async_enter_context(self): + async def test_enter_async_context(self): class TestCM(object): async def __aenter__(self): result.append(1) @@ -507,6 +599,26 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase): self.assertEqual(result, [1, 2, 3, 4]) + @_async_test + async def test_enter_async_context_errors(self): + class LacksEnterAndExit: + pass + class LacksEnter: + async def __aexit__(self, *exc_info): + pass + class LacksExit: + async def __aenter__(self): + pass + + async with self.exit_stack() as stack: + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await stack.enter_async_context(LacksEnterAndExit()) + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await stack.enter_async_context(LacksEnter()) + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await stack.enter_async_context(LacksExit()) + self.assertFalse(stack._exit_callbacks) + @_async_test async def test_async_exit_exception_chaining(self): # Ensure exception chaining matches the reference behaviour @@ -539,6 +651,53 @@ class TestAsyncExitStack(TestBaseExitStack, unittest.TestCase): self.assertIsInstance(inner_exc, ValueError) self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) + @_async_test + async def test_async_exit_exception_explicit_none_context(self): + # Ensure AsyncExitStack chaining matches actual nested `with` statements + # regarding explicit __context__ = None. + + class MyException(Exception): + pass + + @asynccontextmanager + async def my_cm(): + try: + yield + except BaseException: + exc = MyException() + try: + raise exc + finally: + exc.__context__ = None + + @asynccontextmanager + async def my_cm_with_exit_stack(): + async with self.exit_stack() as stack: + await stack.enter_async_context(my_cm()) + yield stack + + for cm in (my_cm, my_cm_with_exit_stack): + with self.subTest(): + try: + async with cm(): + raise IndexError() + except MyException as exc: + self.assertIsNone(exc.__context__) + else: + self.fail("Expected IndexError, but no exception was raised") + + @_async_test + async def test_instance_bypass_async(self): + class Example(object): pass + cm = Example() + cm.__aenter__ = object() + cm.__aexit__ = object() + stack = self.exit_stack() + with self.assertRaisesRegex(TypeError, 'asynchronous context manager'): + await stack.enter_async_context(cm) + stack.push_async_exit(cm) + self.assertIs(stack._exit_callbacks[-1][1], cm) + class TestAsyncNullcontext(unittest.TestCase): @_async_test diff --git a/test/ziptestdata/README.md b/test/ziptestdata/README.md new file mode 100644 index 0000000..6b9147d --- /dev/null +++ b/test/ziptestdata/README.md @@ -0,0 +1,35 @@ +# Test data for `test_zipfile` + +The test executables in this directory are created manually from header.sh and +the `testdata_module_inside_zip.py` file. You must have infozip's zip utility +installed (`apt install zip` on Debian). + +## Purpose + +These are used to test executable files with an appended zipfile, in a scenario +where the executable is _not_ a Python interpreter itself so our automatic +zipimport machinery (that'd look for `__main__.py`) is not being used. + +## Updating the test executables + +If you update header.sh or the testdata_module_inside_zip.py file, rerun the +commands below. These are expected to be rarely changed, if ever. + +### Standard old format (2.0) zip file + +``` +zip -0 zip2.zip testdata_module_inside_zip.py +cat header.sh zip2.zip >exe_with_zip +rm zip2.zip +``` + +### Modern format (4.5) zip64 file + +Redirecting from stdin forces infozip's zip tool to create a zip64. + +``` +zip -0 zip64.zip +cat header.sh zip64.zip >exe_with_z64 +rm zip64.zip +``` + diff --git a/test/ziptestdata/exe_with_z64 b/test/ziptestdata/exe_with_z64 new file mode 100755 index 0000000000000000000000000000000000000000..82b03cf39d919d9de05866b4f7cdf1dfe47e6eb3 GIT binary patch literal 978 zcmaJ<-EY${5KkEl$xl4ME4rnV(xNp330}q~w6RoFg|_N41e7XrQ)jhEoXByzLbV6} z7bN~nxXZ`b5aOcPclPh+`)uy)&!pO)@qEF01K%5u#vZQ0`QQ{+-#hbQJlMzfN zumhbn*t?s5Bd=_jPG5pq2*m(Jgo_mHo-#sbTHp%FGB+?21c5M360YVDOC^Boi)A8| zaqW`1mIj`)NHXt(_xjvFK6&e598YZ!YZ3l8f{q6rI6U+Qr@^orj6V8rh65&(EY$|m zyw<+SERwNcOz}kI84m>G zSHN@NP(AKCZFVWm;@bWsvo1d0s^NQ(q;qlPXs1m?Of5j_0ahSNH4rM0DoR1B`pzXg zmbq!Q2?j9dhGVD|)zyN}i{}esyQ-xKTZG$#>tt`JC1{4sF91y#s`x7`^Rh*e)YvZy zgkqqkaUCw?O1P{lg45-zR7)d3Et45`xQsPi8a|7~fpf#r#O@xyAC7yz7Yxqdop@t= z+GecTY{GH*C{6^9iZVG|$~dMm;TcwVF6O`^njW)|c@d2ZNMpBKJnC=V?N}s_K0g`u zf>&q1Drr~`txm&wV0p#0b-g#i7nomB!y-wOlGog%8hujhFq@*CrC0V>0$BJLY}9Yu zdAxPoGdZHaQ8}dT$9GygqyF~x9%(2wjr1B?@B4I!vMx6ZdG|^ES=ode_3v$y*}#wR GCH6PMMGy`E literal 0 HcmV?d00001 diff --git a/test/ziptestdata/exe_with_zip b/test/ziptestdata/exe_with_zip new file mode 100755 index 0000000000000000000000000000000000000000..c833cdf9f934b08fc449da077efae44aa9a06f18 GIT binary patch literal 990 zcmah{&2G~`5Ozt6#BxUB0BGt|a)MeHBzh^WP)i*V6_O}*5D+43W3QVP_S)KAHz`pi zcm>WJc^aOC$6(e@N~!o+%den-3;)Sr%Y!Z0+w(d{LAMq3-uf@P z9m3N*lNvI$JbmPO%o9e4pea*14H@ji{DR<>2{Yf&&6LZ;8JC$DI=^T*Ba%xlbR%}U zITKu*!h8w30IGn(BDw1{$&~BKrT>oSEll57hHpZeMQq=ZPSXIfv;d*Is6d=aFi`;) zaRyv0|GCCbxYCWL2?L0zrbu-GbtR)wnZ5)z7h1BgVd6I7ve+xfDrk(z4*+%OisT#$ zRkbMQ68mL{7!IasRE86N#$2)x!D-R6OmfXY6zLc{TyYHxO~(n_b*@}Av|9(SyZyHB z1)agGL$7a-nuOHrbvUS!;zZ!62(4hslf;Y(%~9cqML=USJ$k}b$;JhQk>6X~JFcw~ z%d9)^A9mZpvl9=`=Dly-vourMXb_;{MX9UeQ7N~ZpAY<7R_*(II{NZyIx1$jt(Dau zHOneZ9ejjVI+sG|%rH|rlgP`o7b`AXUNIxrip1vZklyjijR&>AvAb(Xm+RYSv;Bwb WTE+Dm&))IcO#@!RC&c}$ajc&zD=F;& literal 0 HcmV?d00001 diff --git a/test/ziptestdata/header.sh b/test/ziptestdata/header.sh new file mode 100755 index 0000000..52dc91a --- /dev/null +++ b/test/ziptestdata/header.sh @@ -0,0 +1,24 @@ +#!/bin/bash +INTERPRETER_UNDER_TEST="$1" +if [[ ! -x "${INTERPRETER_UNDER_TEST}" ]]; then + echo "Interpreter must be the command line argument." + exit 4 +fi +EXECUTABLE="$0" exec "${INTERPRETER_UNDER_TEST}" -E - <