Source code for attr._make

# SPDX-License-Identifier: MIT

from __future__ import annotations

import abc
import contextlib
import copy
import enum
import functools
import inspect
import itertools
import linecache
import sys
import types
import typing

from operator import itemgetter

# We need to import _compat itself in addition to the _compat members to avoid
# having the thread-local in the globals here.
from . import _compat, _config, setters
from ._compat import (
    PY_3_8_PLUS,
    PY_3_10_PLUS,
    PY_3_11_PLUS,
    _AnnotationExtractor,
    _get_annotations,
    get_generic_base,
)
from .exceptions import (
    DefaultAlreadySetError,
    FrozenInstanceError,
    NotAnAttrsClassError,
    UnannotatedAttributeError,
)


# This is used at least twice, so cache it here.
_OBJ_SETATTR = object.__setattr__
_INIT_FACTORY_PAT = "__attr_factory_%s"
_CLASSVAR_PREFIXES = (
    "typing.ClassVar",
    "t.ClassVar",
    "ClassVar",
    "typing_extensions.ClassVar",
)
# we don't use a double-underscore prefix because that triggers
# name mangling when trying to create a slot for the field
# (when slots=True)
_HASH_CACHE_FIELD = "_attrs_cached_hash"

_EMPTY_METADATA_SINGLETON = types.MappingProxyType({})

# Unique object for unequivocal getattr() defaults.
_SENTINEL = object()

_DEFAULT_ON_SETATTR = setters.pipe(setters.convert, setters.validate)


class _Nothing(enum.Enum):
    """
    Sentinel to indicate the lack of a value when `None` is ambiguous.

    If extending attrs, you can use ``typing.Literal[NOTHING]`` to show
    that a value may be ``NOTHING``.

    .. versionchanged:: 21.1.0 ``bool(NOTHING)`` is now False.
    .. versionchanged:: 22.2.0 ``NOTHING`` is now an ``enum.Enum`` variant.
    """

    NOTHING = enum.auto()

    def __repr__(self):
        return "NOTHING"

    def __bool__(self):
        return False


NOTHING = _Nothing.NOTHING
"""
Sentinel to indicate the lack of a value when `None` is ambiguous.
"""


class _CacheHashWrapper(int):
    """
    An integer subclass that pickles / copies as None

    This is used for non-slots classes with ``cache_hash=True``, to avoid
    serializing a potentially (even likely) invalid hash value. Since `None`
    is the default value for uncalculated hashes, whenever this is copied,
    the copy's value for the hash should automatically reset.

    See GH #613 for more details.
    """

    def __reduce__(self, _none_constructor=type(None), _args=()):  # noqa: B008
        return _none_constructor, _args


def attrib(
    default=NOTHING,
    validator=None,
    repr=True,
    cmp=None,
    hash=None,
    init=True,
    metadata=None,
    type=None,
    converter=None,
    factory=None,
    kw_only=False,
    eq=None,
    order=None,
    on_setattr=None,
    alias=None,
):
    """
    Create a new field / attribute on a class.

    Identical to `attrs.field`, except it's not keyword-only.

    Consider using `attrs.field` in new code (``attr.ib`` will *never* go away,
    though).

    ..  warning::

        Does **nothing** unless the class is also decorated with
        `attr.s` (or similar)!


    .. versionadded:: 15.2.0 *convert*
    .. versionadded:: 16.3.0 *metadata*
    .. versionchanged:: 17.1.0 *validator* can be a ``list`` now.
    .. versionchanged:: 17.1.0
       *hash* is `None` and therefore mirrors *eq* by default.
    .. versionadded:: 17.3.0 *type*
    .. deprecated:: 17.4.0 *convert*
    .. versionadded:: 17.4.0
       *converter* as a replacement for the deprecated *convert* to achieve
       consistency with other noun-based arguments.
    .. versionadded:: 18.1.0
       ``factory=f`` is syntactic sugar for ``default=attr.Factory(f)``.
    .. versionadded:: 18.2.0 *kw_only*
    .. versionchanged:: 19.2.0 *convert* keyword argument removed.
    .. versionchanged:: 19.2.0 *repr* also accepts a custom callable.
    .. deprecated:: 19.2.0 *cmp* Removal on or after 2021-06-01.
    .. versionadded:: 19.2.0 *eq* and *order*
    .. versionadded:: 20.1.0 *on_setattr*
    .. versionchanged:: 20.3.0 *kw_only* backported to Python 2
    .. versionchanged:: 21.1.0
       *eq*, *order*, and *cmp* also accept a custom callable
    .. versionchanged:: 21.1.0 *cmp* undeprecated
    .. versionadded:: 22.2.0 *alias*
    """
    eq, eq_key, order, order_key = _determine_attrib_eq_order(
        cmp, eq, order, True
    )

    if hash is not None and hash is not True and hash is not False:
        msg = "Invalid value for hash.  Must be True, False, or None."
        raise TypeError(msg)

    if factory is not None:
        if default is not NOTHING:
            msg = (
                "The `default` and `factory` arguments are mutually exclusive."
            )
            raise ValueError(msg)
        if not callable(factory):
            msg = "The `factory` argument must be a callable."
            raise ValueError(msg)
        default = Factory(factory)

    if metadata is None:
        metadata = {}

    # Apply syntactic sugar by auto-wrapping.
    if isinstance(on_setattr, (list, tuple)):
        on_setattr = setters.pipe(*on_setattr)

    if validator and isinstance(validator, (list, tuple)):
        validator = and_(*validator)

    if converter and isinstance(converter, (list, tuple)):
        converter = pipe(*converter)

    return _CountingAttr(
        default=default,
        validator=validator,
        repr=repr,
        cmp=None,
        hash=hash,
        init=init,
        converter=converter,
        metadata=metadata,
        type=type,
        kw_only=kw_only,
        eq=eq,
        eq_key=eq_key,
        order=order,
        order_key=order_key,
        on_setattr=on_setattr,
        alias=alias,
    )


def _compile_and_eval(script, globs, locs=None, filename=""):
    """
    Evaluate the script with the given global (globs) and local (locs)
    variables.
    """
    bytecode = compile(script, filename, "exec")
    eval(bytecode, globs, locs)


def _make_method(name, script, filename, globs, locals=None):
    """
    Create the method with the script given and return the method object.
    """
    locs = {} if locals is None else locals

    # In order of debuggers like PDB being able to step through the code,
    # we add a fake linecache entry.
    count = 1
    base_filename = filename
    while True:
        linecache_tuple = (
            len(script),
            None,
            script.splitlines(True),
            filename,
        )
        old_val = linecache.cache.setdefault(filename, linecache_tuple)
        if old_val == linecache_tuple:
            break

        filename = f"{base_filename[:-1]}-{count}>"
        count += 1

    _compile_and_eval(script, globs, locs, filename)

    return locs[name]


def _make_attr_tuple_class(cls_name, attr_names):
    """
    Create a tuple subclass to hold `Attribute`s for an `attrs` class.

    The subclass is a bare tuple with properties for names.

    class MyClassAttributes(tuple):
        __slots__ = ()
        x = property(itemgetter(0))
    """
    attr_class_name = f"{cls_name}Attributes"
    attr_class_template = [
        f"class {attr_class_name}(tuple):",
        "    __slots__ = ()",
    ]
    if attr_names:
        for i, attr_name in enumerate(attr_names):
            attr_class_template.append(
                f"    {attr_name} = _attrs_property(_attrs_itemgetter({i}))"
            )
    else:
        attr_class_template.append("    pass")
    globs = {"_attrs_itemgetter": itemgetter, "_attrs_property": property}
    _compile_and_eval("\n".join(attr_class_template), globs)
    return globs[attr_class_name]


# Tuple class for extracted attributes from a class definition.
# `base_attrs` is a subset of `attrs`.
_Attributes = _make_attr_tuple_class(
    "_Attributes",
    [
        # all attributes to build dunder methods for
        "attrs",
        # attributes that have been inherited
        "base_attrs",
        # map inherited attributes to their originating classes
        "base_attrs_map",
    ],
)


def _is_class_var(annot):
    """
    Check whether *annot* is a typing.ClassVar.

    The string comparison hack is used to avoid evaluating all string
    annotations which would put attrs-based classes at a performance
    disadvantage compared to plain old classes.
    """
    annot = str(annot)

    # Annotation can be quoted.
    if annot.startswith(("'", '"')) and annot.endswith(("'", '"')):
        annot = annot[1:-1]

    return annot.startswith(_CLASSVAR_PREFIXES)


def _has_own_attribute(cls, attrib_name):
    """
    Check whether *cls* defines *attrib_name* (and doesn't just inherit it).
    """
    return attrib_name in cls.__dict__


def _collect_base_attrs(cls, taken_attr_names):
    """
    Collect attr.ibs from base classes of *cls*, except *taken_attr_names*.
    """
    base_attrs = []
    base_attr_map = {}  # A dictionary of base attrs to their classes.

    # Traverse the MRO and collect attributes.
    for base_cls in reversed(cls.__mro__[1:-1]):
        for a in getattr(base_cls, "__attrs_attrs__", []):
            if a.inherited or a.name in taken_attr_names:
                continue

            a = a.evolve(inherited=True)  # noqa: PLW2901
            base_attrs.append(a)
            base_attr_map[a.name] = base_cls

    # For each name, only keep the freshest definition i.e. the furthest at the
    # back.  base_attr_map is fine because it gets overwritten with every new
    # instance.
    filtered = []
    seen = set()
    for a in reversed(base_attrs):
        if a.name in seen:
            continue
        filtered.insert(0, a)
        seen.add(a.name)

    return filtered, base_attr_map


def _collect_base_attrs_broken(cls, taken_attr_names):
    """
    Collect attr.ibs from base classes of *cls*, except *taken_attr_names*.

    N.B. *taken_attr_names* will be mutated.

    Adhere to the old incorrect behavior.

    Notably it collects from the front and considers inherited attributes which
    leads to the buggy behavior reported in #428.
    """
    base_attrs = []
    base_attr_map = {}  # A dictionary of base attrs to their classes.

    # Traverse the MRO and collect attributes.
    for base_cls in cls.__mro__[1:-1]:
        for a in getattr(base_cls, "__attrs_attrs__", []):
            if a.name in taken_attr_names:
                continue

            a = a.evolve(inherited=True)  # noqa: PLW2901
            taken_attr_names.add(a.name)
            base_attrs.append(a)
            base_attr_map[a.name] = base_cls

    return base_attrs, base_attr_map


def _transform_attrs(
    cls, these, auto_attribs, kw_only, collect_by_mro, field_transformer
):
    """
    Transform all `_CountingAttr`s on a class into `Attribute`s.

    If *these* is passed, use that and don't look for them on the class.

    If *collect_by_mro* is True, collect them in the correct MRO order,
    otherwise use the old -- incorrect -- order.  See #428.

    Return an `_Attributes`.
    """
    cd = cls.__dict__
    anns = _get_annotations(cls)

    if these is not None:
        ca_list = list(these.items())
    elif auto_attribs is True:
        ca_names = {
            name
            for name, attr in cd.items()
            if isinstance(attr, _CountingAttr)
        }
        ca_list = []
        annot_names = set()
        for attr_name, type in anns.items():
            if _is_class_var(type):
                continue
            annot_names.add(attr_name)
            a = cd.get(attr_name, NOTHING)

            if not isinstance(a, _CountingAttr):
                a = attrib() if a is NOTHING else attrib(default=a)
            ca_list.append((attr_name, a))

        unannotated = ca_names - annot_names
        if len(unannotated) > 0:
            raise UnannotatedAttributeError(
                "The following `attr.ib`s lack a type annotation: "
                + ", ".join(
                    sorted(unannotated, key=lambda n: cd.get(n).counter)
                )
                + "."
            )
    else:
        ca_list = sorted(
            (
                (name, attr)
                for name, attr in cd.items()
                if isinstance(attr, _CountingAttr)
            ),
            key=lambda e: e[1].counter,
        )

    own_attrs = [
        Attribute.from_counting_attr(
            name=attr_name, ca=ca, type=anns.get(attr_name)
        )
        for attr_name, ca in ca_list
    ]

    if collect_by_mro:
        base_attrs, base_attr_map = _collect_base_attrs(
            cls, {a.name for a in own_attrs}
        )
    else:
        base_attrs, base_attr_map = _collect_base_attrs_broken(
            cls, {a.name for a in own_attrs}
        )

    if kw_only:
        own_attrs = [a.evolve(kw_only=True) for a in own_attrs]
        base_attrs = [a.evolve(kw_only=True) for a in base_attrs]

    attrs = base_attrs + own_attrs

    # Mandatory vs non-mandatory attr order only matters when they are part of
    # the __init__ signature and when they aren't kw_only (which are moved to
    # the end and can be mandatory or non-mandatory in any order, as they will
    # be specified as keyword args anyway). Check the order of those attrs:
    had_default = False
    for a in (a for a in attrs if a.init is not False and a.kw_only is False):
        if had_default is True and a.default is NOTHING:
            msg = f"No mandatory attributes allowed after an attribute with a default value or factory.  Attribute in question: {a!r}"
            raise ValueError(msg)

        if had_default is False and a.default is not NOTHING:
            had_default = True

    if field_transformer is not None:
        attrs = field_transformer(cls, attrs)

    # Resolve default field alias after executing field_transformer.
    # This allows field_transformer to differentiate between explicit vs
    # default aliases and supply their own defaults.
    attrs = [
        a.evolve(alias=_default_init_alias_for(a.name)) if not a.alias else a
        for a in attrs
    ]

    # Create AttrsClass *after* applying the field_transformer since it may
    # add or remove attributes!
    attr_names = [a.name for a in attrs]
    AttrsClass = _make_attr_tuple_class(cls.__name__, attr_names)

    return _Attributes((AttrsClass(attrs), base_attrs, base_attr_map))


def _make_cached_property_getattr(cached_properties, original_getattr, cls):
    lines = [
        # Wrapped to get `__class__` into closure cell for super()
        # (It will be replaced with the newly constructed class after construction).
        "def wrapper(_cls):",
        "    __class__ = _cls",
        "    def __getattr__(self, item, cached_properties=cached_properties, original_getattr=original_getattr, _cached_setattr_get=_cached_setattr_get):",
        "         func = cached_properties.get(item)",
        "         if func is not None:",
        "              result = func(self)",
        "              _setter = _cached_setattr_get(self)",
        "              _setter(item, result)",
        "              return result",
    ]
    if original_getattr is not None:
        lines.append(
            "         return original_getattr(self, item)",
        )
    else:
        lines.extend(
            [
                "         try:",
                "             return super().__getattribute__(item)",
                "         except AttributeError:",
                "             if not hasattr(super(), '__getattr__'):",
                "                 raise",
                "             return super().__getattr__(item)",
                "         original_error = f\"'{self.__class__.__name__}' object has no attribute '{item}'\"",
                "         raise AttributeError(original_error)",
            ]
        )

    lines.extend(
        [
            "    return __getattr__",
            "__getattr__ = wrapper(_cls)",
        ]
    )

    unique_filename = _generate_unique_filename(cls, "getattr")

    glob = {
        "cached_properties": cached_properties,
        "_cached_setattr_get": _OBJ_SETATTR.__get__,
        "original_getattr": original_getattr,
    }

    return _make_method(
        "__getattr__",
        "\n".join(lines),
        unique_filename,
        glob,
        locals={
            "_cls": cls,
        },
    )


def _frozen_setattrs(self, name, value):
    """
    Attached to frozen classes as __setattr__.
    """
    if isinstance(self, BaseException) and name in (
        "__cause__",
        "__context__",
        "__traceback__",
    ):
        BaseException.__setattr__(self, name, value)
        return

    raise FrozenInstanceError()


def _frozen_delattrs(self, name):
    """
    Attached to frozen classes as __delattr__.
    """
    raise FrozenInstanceError()


class _ClassBuilder:
    """
    Iteratively build *one* class.
    """

    __slots__ = (
        "_attr_names",
        "_attrs",
        "_base_attr_map",
        "_base_names",
        "_cache_hash",
        "_cls",
        "_cls_dict",
        "_delete_attribs",
        "_frozen",
        "_has_pre_init",
        "_pre_init_has_args",
        "_has_post_init",
        "_is_exc",
        "_on_setattr",
        "_slots",
        "_weakref_slot",
        "_wrote_own_setattr",
        "_has_custom_setattr",
    )

    def __init__(
        self,
        cls,
        these,
        slots,
        frozen,
        weakref_slot,
        getstate_setstate,
        auto_attribs,
        kw_only,
        cache_hash,
        is_exc,
        collect_by_mro,
        on_setattr,
        has_custom_setattr,
        field_transformer,
    ):
        attrs, base_attrs, base_map = _transform_attrs(
            cls,
            these,
            auto_attribs,
            kw_only,
            collect_by_mro,
            field_transformer,
        )

        self._cls = cls
        self._cls_dict = dict(cls.__dict__) if slots else {}
        self._attrs = attrs
        self._base_names = {a.name for a in base_attrs}
        self._base_attr_map = base_map
        self._attr_names = tuple(a.name for a in attrs)
        self._slots = slots
        self._frozen = frozen
        self._weakref_slot = weakref_slot
        self._cache_hash = cache_hash
        self._has_pre_init = bool(getattr(cls, "__attrs_pre_init__", False))
        self._pre_init_has_args = False
        if self._has_pre_init:
            # Check if the pre init method has more arguments than just `self`
            # We want to pass arguments if pre init expects arguments
            pre_init_func = cls.__attrs_pre_init__
            pre_init_signature = inspect.signature(pre_init_func)
            self._pre_init_has_args = len(pre_init_signature.parameters) > 1
        self._has_post_init = bool(getattr(cls, "__attrs_post_init__", False))
        self._delete_attribs = not bool(these)
        self._is_exc = is_exc
        self._on_setattr = on_setattr

        self._has_custom_setattr = has_custom_setattr
        self._wrote_own_setattr = False

        self._cls_dict["__attrs_attrs__"] = self._attrs

        if frozen:
            self._cls_dict["__setattr__"] = _frozen_setattrs
            self._cls_dict["__delattr__"] = _frozen_delattrs

            self._wrote_own_setattr = True
        elif on_setattr in (
            _DEFAULT_ON_SETATTR,
            setters.validate,
            setters.convert,
        ):
            has_validator = has_converter = False
            for a in attrs:
                if a.validator is not None:
                    has_validator = True
                if a.converter is not None:
                    has_converter = True

                if has_validator and has_converter:
                    break
            if (
                (
                    on_setattr == _DEFAULT_ON_SETATTR
                    and not (has_validator or has_converter)
                )
                or (on_setattr == setters.validate and not has_validator)
                or (on_setattr == setters.convert and not has_converter)
            ):
                # If class-level on_setattr is set to convert + validate, but
                # there's no field to convert or validate, pretend like there's
                # no on_setattr.
                self._on_setattr = None

        if getstate_setstate:
            (
                self._cls_dict["__getstate__"],
                self._cls_dict["__setstate__"],
            ) = self._make_getstate_setstate()

    def __repr__(self):
        return f"<_ClassBuilder(cls={self._cls.__name__})>"

    def build_class(self):
        """
        Finalize class based on the accumulated configuration.

        Builder cannot be used after calling this method.
        """
        if self._slots is True:
            cls = self._create_slots_class()
        else:
            cls = self._patch_original_class()
            if PY_3_10_PLUS:
                cls = abc.update_abstractmethods(cls)

        # The method gets only called if it's not inherited from a base class.
        # _has_own_attribute does NOT work properly for classmethods.
        if (
            getattr(cls, "__attrs_init_subclass__", None)
            and "__attrs_init_subclass__" not in cls.__dict__
        ):
            cls.__attrs_init_subclass__()

        return cls

    def _patch_original_class(self):
        """
        Apply accumulated methods and return the class.
        """
        cls = self._cls
        base_names = self._base_names

        # Clean class of attribute definitions (`attr.ib()`s).
        if self._delete_attribs:
            for name in self._attr_names:
                if (
                    name not in base_names
                    and getattr(cls, name, _SENTINEL) is not _SENTINEL
                ):
                    # An AttributeError can happen if a base class defines a
                    # class variable and we want to set an attribute with the
                    # same name by using only a type annotation.
                    with contextlib.suppress(AttributeError):
                        delattr(cls, name)

        # Attach our dunder methods.
        for name, value in self._cls_dict.items():
            setattr(cls, name, value)

        # If we've inherited an attrs __setattr__ and don't write our own,
        # reset it to object's.
        if not self._wrote_own_setattr and getattr(
            cls, "__attrs_own_setattr__", False
        ):
            cls.__attrs_own_setattr__ = False

            if not self._has_custom_setattr:
                cls.__setattr__ = _OBJ_SETATTR

        return cls

    def _create_slots_class(self):
        """
        Build and return a new class with a `__slots__` attribute.
        """
        cd = {
            k: v
            for k, v in self._cls_dict.items()
            if k not in (*tuple(self._attr_names), "__dict__", "__weakref__")
        }

        # If our class doesn't have its own implementation of __setattr__
        # (either from the user or by us), check the bases, if one of them has
        # an attrs-made __setattr__, that needs to be reset. We don't walk the
        # MRO because we only care about our immediate base classes.
        # XXX: This can be confused by subclassing a slotted attrs class with
        # XXX: a non-attrs class and subclass the resulting class with an attrs
        # XXX: class.  See `test_slotted_confused` for details.  For now that's
        # XXX: OK with us.
        if not self._wrote_own_setattr:
            cd["__attrs_own_setattr__"] = False

            if not self._has_custom_setattr:
                for base_cls in self._cls.__bases__:
                    if base_cls.__dict__.get("__attrs_own_setattr__", False):
                        cd["__setattr__"] = _OBJ_SETATTR
                        break

        # Traverse the MRO to collect existing slots
        # and check for an existing __weakref__.
        existing_slots = {}
        weakref_inherited = False
        for base_cls in self._cls.__mro__[1:-1]:
            if base_cls.__dict__.get("__weakref__", None) is not None:
                weakref_inherited = True
            existing_slots.update(
                {
                    name: getattr(base_cls, name)
                    for name in getattr(base_cls, "__slots__", [])
                }
            )

        base_names = set(self._base_names)

        names = self._attr_names
        if (
            self._weakref_slot
            and "__weakref__" not in getattr(self._cls, "__slots__", ())
            and "__weakref__" not in names
            and not weakref_inherited
        ):
            names += ("__weakref__",)

        if PY_3_8_PLUS:
            cached_properties = {
                name: cached_property.func
                for name, cached_property in cd.items()
                if isinstance(cached_property, functools.cached_property)
            }
        else:
            # `functools.cached_property` was introduced in 3.8.
            # So can't be used before this.
            cached_properties = {}

        # Collect methods with a `__class__` reference that are shadowed in the new class.
        # To know to update them.
        additional_closure_functions_to_update = []
        if cached_properties:
            class_annotations = _get_annotations(self._cls)
            for name, func in cached_properties.items():
                # Add cached properties to names for slotting.
                names += (name,)
                # Clear out function from class to avoid clashing.
                del cd[name]
                additional_closure_functions_to_update.append(func)
                annotation = inspect.signature(func).return_annotation
                if annotation is not inspect.Parameter.empty:
                    class_annotations[name] = annotation

            original_getattr = cd.get("__getattr__")
            if original_getattr is not None:
                additional_closure_functions_to_update.append(original_getattr)

            cd["__getattr__"] = _make_cached_property_getattr(
                cached_properties, original_getattr, self._cls
            )

        # We only add the names of attributes that aren't inherited.
        # Setting __slots__ to inherited attributes wastes memory.
        slot_names = [name for name in names if name not in base_names]

        # There are slots for attributes from current class
        # that are defined in parent classes.
        # As their descriptors may be overridden by a child class,
        # we collect them here and update the class dict
        reused_slots = {
            slot: slot_descriptor
            for slot, slot_descriptor in existing_slots.items()
            if slot in slot_names
        }
        slot_names = [name for name in slot_names if name not in reused_slots]
        cd.update(reused_slots)
        if self._cache_hash:
            slot_names.append(_HASH_CACHE_FIELD)

        cd["__slots__"] = tuple(slot_names)

        cd["__qualname__"] = self._cls.__qualname__

        # Create new class based on old class and our methods.
        cls = type(self._cls)(self._cls.__name__, self._cls.__bases__, cd)

        # The following is a fix for
        # <https://github.com/python-attrs/attrs/issues/102>.
        # If a method mentions `__class__` or uses the no-arg super(), the
        # compiler will bake a reference to the class in the method itself
        # as `method.__closure__`.  Since we replace the class with a
        # clone, we rewrite these references so it keeps working.
        for item in itertools.chain(
            cls.__dict__.values(), additional_closure_functions_to_update
        ):
            if isinstance(item, (classmethod, staticmethod)):
                # Class- and staticmethods hide their functions inside.
                # These might need to be rewritten as well.
                closure_cells = getattr(item.__func__, "__closure__", None)
            elif isinstance(item, property):
                # Workaround for property `super()` shortcut (PY3-only).
                # There is no universal way for other descriptors.
                closure_cells = getattr(item.fget, "__closure__", None)
            else:
                closure_cells = getattr(item, "__closure__", None)

            if not closure_cells:  # Catch None or the empty list.
                continue
            for cell in closure_cells:
                try:
                    match = cell.cell_contents is self._cls
                except ValueError:  # noqa: PERF203
                    # ValueError: Cell is empty
                    pass
                else:
                    if match:
                        cell.cell_contents = cls
        return cls

    def add_repr(self, ns):
        self._cls_dict["__repr__"] = self._add_method_dunders(
            _make_repr(self._attrs, ns, self._cls)
        )
        return self

    def add_str(self):
        repr = self._cls_dict.get("__repr__")
        if repr is None:
            msg = "__str__ can only be generated if a __repr__ exists."
            raise ValueError(msg)

        def __str__(self):
            return self.__repr__()

        self._cls_dict["__str__"] = self._add_method_dunders(__str__)
        return self

    def _make_getstate_setstate(self):
        """
        Create custom __setstate__ and __getstate__ methods.
        """
        # __weakref__ is not writable.
        state_attr_names = tuple(
            an for an in self._attr_names if an != "__weakref__"
        )

        def slots_getstate(self):
            """
            Automatically created by attrs.
            """
            return {name: getattr(self, name) for name in state_attr_names}

        hash_caching_enabled = self._cache_hash

        def slots_setstate(self, state):
            """
            Automatically created by attrs.
            """
            __bound_setattr = _OBJ_SETATTR.__get__(self)
            if isinstance(state, tuple):
                # Backward compatibility with attrs instances pickled with
                # attrs versions before v22.2.0 which stored tuples.
                for name, value in zip(state_attr_names, state):
                    __bound_setattr(name, value)
            else:
                for name in state_attr_names:
                    if name in state:
                        __bound_setattr(name, state[name])

            # The hash code cache is not included when the object is
            # serialized, but it still needs to be initialized to None to
            # indicate that the first call to __hash__ should be a cache
            # miss.
            if hash_caching_enabled:
                __bound_setattr(_HASH_CACHE_FIELD, None)

        return slots_getstate, slots_setstate

    def make_unhashable(self):
        self._cls_dict["__hash__"] = None
        return self

    def add_hash(self):
        self._cls_dict["__hash__"] = self._add_method_dunders(
            _make_hash(
                self._cls,
                self._attrs,
                frozen=self._frozen,
                cache_hash=self._cache_hash,
            )
        )

        return self

    def add_init(self):
        self._cls_dict["__init__"] = self._add_method_dunders(
            _make_init(
                self._cls,
                self._attrs,
                self._has_pre_init,
                self._pre_init_has_args,
                self._has_post_init,
                self._frozen,
                self._slots,
                self._cache_hash,
                self._base_attr_map,
                self._is_exc,
                self._on_setattr,
                attrs_init=False,
            )
        )

        return self

    def add_match_args(self):
        self._cls_dict["__match_args__"] = tuple(
            field.name
            for field in self._attrs
            if field.init and not field.kw_only
        )

    def add_attrs_init(self):
        self._cls_dict["__attrs_init__"] = self._add_method_dunders(
            _make_init(
                self._cls,
                self._attrs,
                self._has_pre_init,
                self._pre_init_has_args,
                self._has_post_init,
                self._frozen,
                self._slots,
                self._cache_hash,
                self._base_attr_map,
                self._is_exc,
                self._on_setattr,
                attrs_init=True,
            )
        )

        return self

    def add_eq(self):
        cd = self._cls_dict

        cd["__eq__"] = self._add_method_dunders(
            _make_eq(self._cls, self._attrs)
        )
        cd["__ne__"] = self._add_method_dunders(_make_ne())

        return self

    def add_order(self):
        cd = self._cls_dict

        cd["__lt__"], cd["__le__"], cd["__gt__"], cd["__ge__"] = (
            self._add_method_dunders(meth)
            for meth in _make_order(self._cls, self._attrs)
        )

        return self

    def add_setattr(self):
        if self._frozen:
            return self

        sa_attrs = {}
        for a in self._attrs:
            on_setattr = a.on_setattr or self._on_setattr
            if on_setattr and on_setattr is not setters.NO_OP:
                sa_attrs[a.name] = a, on_setattr

        if not sa_attrs:
            return self

        if self._has_custom_setattr:
            # We need to write a __setattr__ but there already is one!
            msg = "Can't combine custom __setattr__ with on_setattr hooks."
            raise ValueError(msg)

        # docstring comes from _add_method_dunders
        def __setattr__(self, name, val):
            try:
                a, hook = sa_attrs[name]
            except KeyError:
                nval = val
            else:
                nval = hook(self, a, val)

            _OBJ_SETATTR(self, name, nval)

        self._cls_dict["__attrs_own_setattr__"] = True
        self._cls_dict["__setattr__"] = self._add_method_dunders(__setattr__)
        self._wrote_own_setattr = True

        return self

    def _add_method_dunders(self, method):
        """
        Add __module__ and __qualname__ to a *method* if possible.
        """
        with contextlib.suppress(AttributeError):
            method.__module__ = self._cls.__module__

        with contextlib.suppress(AttributeError):
            method.__qualname__ = f"{self._cls.__qualname__}.{method.__name__}"

        with contextlib.suppress(AttributeError):
            method.__doc__ = (
                "Method generated by attrs for class "
                f"{self._cls.__qualname__}."
            )

        return method


def _determine_attrs_eq_order(cmp, eq, order, default_eq):
    """
    Validate the combination of *cmp*, *eq*, and *order*. Derive the effective
    values of eq and order.  If *eq* is None, set it to *default_eq*.
    """
    if cmp is not None and any((eq is not None, order is not None)):
        msg = "Don't mix `cmp` with `eq' and `order`."
        raise ValueError(msg)

    # cmp takes precedence due to bw-compatibility.
    if cmp is not None:
        return cmp, cmp

    # If left None, equality is set to the specified default and ordering
    # mirrors equality.
    if eq is None:
        eq = default_eq

    if order is None:
        order = eq

    if eq is False and order is True:
        msg = "`order` can only be True if `eq` is True too."
        raise ValueError(msg)

    return eq, order


def _determine_attrib_eq_order(cmp, eq, order, default_eq):
    """
    Validate the combination of *cmp*, *eq*, and *order*. Derive the effective
    values of eq and order.  If *eq* is None, set it to *default_eq*.
    """
    if cmp is not None and any((eq is not None, order is not None)):
        msg = "Don't mix `cmp` with `eq' and `order`."
        raise ValueError(msg)

    def decide_callable_or_boolean(value):
        """
        Decide whether a key function is used.
        """
        if callable(value):
            value, key = True, value
        else:
            key = None
        return value, key

    # cmp takes precedence due to bw-compatibility.
    if cmp is not None:
        cmp, cmp_key = decide_callable_or_boolean(cmp)
        return cmp, cmp_key, cmp, cmp_key

    # If left None, equality is set to the specified default and ordering
    # mirrors equality.
    if eq is None:
        eq, eq_key = default_eq, None
    else:
        eq, eq_key = decide_callable_or_boolean(eq)

    if order is None:
        order, order_key = eq, eq_key
    else:
        order, order_key = decide_callable_or_boolean(order)

    if eq is False and order is True:
        msg = "`order` can only be True if `eq` is True too."
        raise ValueError(msg)

    return eq, eq_key, order, order_key


def _determine_whether_to_implement(
    cls, flag, auto_detect, dunders, default=True
):
    """
    Check whether we should implement a set of methods for *cls*.

    *flag* is the argument passed into @attr.s like 'init', *auto_detect* the
    same as passed into @attr.s and *dunders* is a tuple of attribute names
    whose presence signal that the user has implemented it themselves.

    Return *default* if no reason for either for or against is found.
    """
    if flag is True or flag is False:
        return flag

    if flag is None and auto_detect is False:
        return default

    # Logically, flag is None and auto_detect is True here.
    for dunder in dunders:
        if _has_own_attribute(cls, dunder):
            return False

    return default


def attrs(
    maybe_cls=None,
    these=None,
    repr_ns=None,
    repr=None,
    cmp=None,
    hash=None,
    init=None,
    slots=False,
    frozen=False,
    weakref_slot=True,
    str=False,
    auto_attribs=False,
    kw_only=False,
    cache_hash=False,
    auto_exc=False,
    eq=None,
    order=None,
    auto_detect=False,
    collect_by_mro=False,
    getstate_setstate=None,
    on_setattr=None,
    field_transformer=None,
    match_args=True,
    unsafe_hash=None,
):
    r"""
    A class decorator that adds :term:`dunder methods` according to the
    specified attributes using `attr.ib` or the *these* argument.

    Consider using `attrs.define` / `attrs.frozen` in new code (``attr.s`` will
    *never* go away, though).

    Args:
        repr_ns (str):
            When using nested classes, there was no way in Python 2 to
            automatically detect that.  This argument allows to set a custom
            name for a more meaningful ``repr`` output.  This argument is
            pointless in Python 3 and is therefore deprecated.

    .. caution::
        Refer to `attrs.define` for the rest of the parameters, but note that they
        can have different defaults.

        Notably, leaving *on_setattr* as `None` will **not** add any hooks.

    .. versionadded:: 16.0.0 *slots*
    .. versionadded:: 16.1.0 *frozen*
    .. versionadded:: 16.3.0 *str*
    .. versionadded:: 16.3.0 Support for ``__attrs_post_init__``.
    .. versionchanged:: 17.1.0
       *hash* supports `None` as value which is also the default now.
    .. versionadded:: 17.3.0 *auto_attribs*
    .. versionchanged:: 18.1.0
       If *these* is passed, no attributes are deleted from the class body.
    .. versionchanged:: 18.1.0 If *these* is ordered, the order is retained.
    .. versionadded:: 18.2.0 *weakref_slot*
    .. deprecated:: 18.2.0
       ``__lt__``, ``__le__``, ``__gt__``, and ``__ge__`` now raise a
       `DeprecationWarning` if the classes compared are subclasses of
       each other. ``__eq`` and ``__ne__`` never tried to compared subclasses
       to each other.
    .. versionchanged:: 19.2.0
       ``__lt__``, ``__le__``, ``__gt__``, and ``__ge__`` now do not consider
       subclasses comparable anymore.
    .. versionadded:: 18.2.0 *kw_only*
    .. versionadded:: 18.2.0 *cache_hash*
    .. versionadded:: 19.1.0 *auto_exc*
    .. deprecated:: 19.2.0 *cmp* Removal on or after 2021-06-01.
    .. versionadded:: 19.2.0 *eq* and *order*
    .. versionadded:: 20.1.0 *auto_detect*
    .. versionadded:: 20.1.0 *collect_by_mro*
    .. versionadded:: 20.1.0 *getstate_setstate*
    .. versionadded:: 20.1.0 *on_setattr*
    .. versionadded:: 20.3.0 *field_transformer*
    .. versionchanged:: 21.1.0
       ``init=False`` injects ``__attrs_init__``
    .. versionchanged:: 21.1.0 Support for ``__attrs_pre_init__``
    .. versionchanged:: 21.1.0 *cmp* undeprecated
    .. versionadded:: 21.3.0 *match_args*
    .. versionadded:: 22.2.0
       *unsafe_hash* as an alias for *hash* (for :pep:`681` compliance).
    .. deprecated:: 24.1.0 *repr_ns*
    .. versionchanged:: 24.1.0
       Instances are not compared as tuples of attributes anymore, but using a
       big ``and`` condition. This is faster and has more correct behavior for
       uncomparable values like `math.nan`.
    .. versionadded:: 24.1.0
       If a class has an *inherited* classmethod called
       ``__attrs_init_subclass__``, it is executed after the class is created.
    .. deprecated:: 24.1.0 *hash* is deprecated in favor of *unsafe_hash*.
    """
    if repr_ns is not None:
        import warnings

        warnings.warn(
            DeprecationWarning(
                "The `repr_ns` argument is deprecated and will be removed in or after August 2025."
            ),
            stacklevel=2,
        )

    eq_, order_ = _determine_attrs_eq_order(cmp, eq, order, None)

    #  unsafe_hash takes precedence due to PEP 681.
    if unsafe_hash is not None:
        hash = unsafe_hash

    if isinstance(on_setattr, (list, tuple)):
        on_setattr = setters.pipe(*on_setattr)

    def wrap(cls):
        is_frozen = frozen or _has_frozen_base_class(cls)
        is_exc = auto_exc is True and issubclass(cls, BaseException)
        has_own_setattr = auto_detect and _has_own_attribute(
            cls, "__setattr__"
        )

        if has_own_setattr and is_frozen:
            msg = "Can't freeze a class with a custom __setattr__."
            raise ValueError(msg)

        builder = _ClassBuilder(
            cls,
            these,
            slots,
            is_frozen,
            weakref_slot,
            _determine_whether_to_implement(
                cls,
                getstate_setstate,
                auto_detect,
                ("__getstate__", "__setstate__"),
                default=slots,
            ),
            auto_attribs,
            kw_only,
            cache_hash,
            is_exc,
            collect_by_mro,
            on_setattr,
            has_own_setattr,
            field_transformer,
        )
        if _determine_whether_to_implement(
            cls, repr, auto_detect, ("__repr__",)
        ):
            builder.add_repr(repr_ns)
        if str is True:
            builder.add_str()

        eq = _determine_whether_to_implement(
            cls, eq_, auto_detect, ("__eq__", "__ne__")
        )
        if not is_exc and eq is True:
            builder.add_eq()
        if not is_exc and _determine_whether_to_implement(
            cls, order_, auto_detect, ("__lt__", "__le__", "__gt__", "__ge__")
        ):
            builder.add_order()

        builder.add_setattr()

        nonlocal hash
        if (
            hash is None
            and auto_detect is True
            and _has_own_attribute(cls, "__hash__")
        ):
            hash = False

        if hash is not True and hash is not False and hash is not None:
            # Can't use `hash in` because 1 == True for example.
            msg = "Invalid value for hash.  Must be True, False, or None."
            raise TypeError(msg)

        if hash is False or (hash is None and eq is False) or is_exc:
            # Don't do anything. Should fall back to __object__'s __hash__
            # which is by id.
            if cache_hash:
                msg = "Invalid value for cache_hash.  To use hash caching, hashing must be either explicitly or implicitly enabled."
                raise TypeError(msg)
        elif hash is True or (
            hash is None and eq is True and is_frozen is True
        ):
            # Build a __hash__ if told so, or if it's safe.
            builder.add_hash()
        else:
            # Raise TypeError on attempts to hash.
            if cache_hash:
                msg = "Invalid value for cache_hash.  To use hash caching, hashing must be either explicitly or implicitly enabled."
                raise TypeError(msg)
            builder.make_unhashable()

        if _determine_whether_to_implement(
            cls, init, auto_detect, ("__init__",)
        ):
            builder.add_init()
        else:
            builder.add_attrs_init()
            if cache_hash:
                msg = "Invalid value for cache_hash.  To use hash caching, init must be True."
                raise TypeError(msg)

        if (
            PY_3_10_PLUS
            and match_args
            and not _has_own_attribute(cls, "__match_args__")
        ):
            builder.add_match_args()

        return builder.build_class()

    # maybe_cls's type depends on the usage of the decorator.  It's a class
    # if it's used as `@attrs` but `None` if used as `@attrs()`.
    if maybe_cls is None:
        return wrap

    return wrap(maybe_cls)


_attrs = attrs
"""
Internal alias so we can use it in functions that take an argument called
*attrs*.
"""


def _has_frozen_base_class(cls):
    """
    Check whether *cls* has a frozen ancestor by looking at its
    __setattr__.
    """
    return cls.__setattr__ is _frozen_setattrs


def _generate_unique_filename(cls, func_name):
    """
    Create a "filename" suitable for a function being generated.
    """
    return (
        f"<attrs generated {func_name} {cls.__module__}."
        f"{getattr(cls, '__qualname__', cls.__name__)}>"
    )


def _make_hash(cls, attrs, frozen, cache_hash):
    attrs = tuple(
        a for a in attrs if a.hash is True or (a.hash is None and a.eq is True)
    )

    tab = "        "

    unique_filename = _generate_unique_filename(cls, "hash")
    type_hash = hash(unique_filename)
    # If eq is custom generated, we need to include the functions in globs
    globs = {}

    hash_def = "def __hash__(self"
    hash_func = "hash(("
    closing_braces = "))"
    if not cache_hash:
        hash_def += "):"
    else:
        hash_def += ", *"

        hash_def += ", _cache_wrapper=__import__('attr._make')._make._CacheHashWrapper):"
        hash_func = "_cache_wrapper(" + hash_func
        closing_braces += ")"

    method_lines = [hash_def]

    def append_hash_computation_lines(prefix, indent):
        """
        Generate the code for actually computing the hash code.
        Below this will either be returned directly or used to compute
        a value which is then cached, depending on the value of cache_hash
        """

        method_lines.extend(
            [
                indent + prefix + hash_func,
                indent + f"        {type_hash},",
            ]
        )

        for a in attrs:
            if a.eq_key:
                cmp_name = f"_{a.name}_key"
                globs[cmp_name] = a.eq_key
                method_lines.append(
                    indent + f"        {cmp_name}(self.{a.name}),"
                )
            else:
                method_lines.append(indent + f"        self.{a.name},")

        method_lines.append(indent + "    " + closing_braces)

    if cache_hash:
        method_lines.append(tab + f"if self.{_HASH_CACHE_FIELD} is None:")
        if frozen:
            append_hash_computation_lines(
                f"object.__setattr__(self, '{_HASH_CACHE_FIELD}', ", tab * 2
            )
            method_lines.append(tab * 2 + ")")  # close __setattr__
        else:
            append_hash_computation_lines(
                f"self.{_HASH_CACHE_FIELD} = ", tab * 2
            )
        method_lines.append(tab + f"return self.{_HASH_CACHE_FIELD}")
    else:
        append_hash_computation_lines("return ", tab)

    script = "\n".join(method_lines)
    return _make_method("__hash__", script, unique_filename, globs)


def _add_hash(cls, attrs):
    """
    Add a hash method to *cls*.
    """
    cls.__hash__ = _make_hash(cls, attrs, frozen=False, cache_hash=False)
    return cls


def _make_ne():
    """
    Create __ne__ method.
    """

    def __ne__(self, other):
        """
        Check equality and either forward a NotImplemented or
        return the result negated.
        """
        result = self.__eq__(other)
        if result is NotImplemented:
            return NotImplemented

        return not result

    return __ne__


def _make_eq(cls, attrs):
    """
    Create __eq__ method for *cls* with *attrs*.
    """
    attrs = [a for a in attrs if a.eq]

    unique_filename = _generate_unique_filename(cls, "eq")
    lines = [
        "def __eq__(self, other):",
        "    if other.__class__ is not self.__class__:",
        "        return NotImplemented",
    ]

    # We can't just do a big self.x = other.x and... clause due to
    # irregularities like nan == nan is false but (nan,) == (nan,) is true.
    globs = {}
    if attrs:
        lines.append("    return  (")
        for a in attrs:
            if a.eq_key:
                cmp_name = f"_{a.name}_key"
                # Add the key function to the global namespace
                # of the evaluated function.
                globs[cmp_name] = a.eq_key
                lines.append(
                    f"        {cmp_name}(self.{a.name}) == {cmp_name}(other.{a.name})"
                )
            else:
                lines.append(f"        self.{a.name} == other.{a.name}")
            if a is not attrs[-1]:
                lines[-1] = f"{lines[-1]} and"
        lines.append("    )")
    else:
        lines.append("    return True")

    script = "\n".join(lines)

    return _make_method("__eq__", script, unique_filename, globs)


def _make_order(cls, attrs):
    """
    Create ordering methods for *cls* with *attrs*.
    """
    attrs = [a for a in attrs if a.order]

    def attrs_to_tuple(obj):
        """
        Save us some typing.
        """
        return tuple(
            key(value) if key else value
            for value, key in (
                (getattr(obj, a.name), a.order_key) for a in attrs
            )
        )

    def __lt__(self, other):
        """
        Automatically created by attrs.
        """
        if other.__class__ is self.__class__:
            return attrs_to_tuple(self) < attrs_to_tuple(other)

        return NotImplemented

    def __le__(self, other):
        """
        Automatically created by attrs.
        """
        if other.__class__ is self.__class__:
            return attrs_to_tuple(self) <= attrs_to_tuple(other)

        return NotImplemented

    def __gt__(self, other):
        """
        Automatically created by attrs.
        """
        if other.__class__ is self.__class__:
            return attrs_to_tuple(self) > attrs_to_tuple(other)

        return NotImplemented

    def __ge__(self, other):
        """
        Automatically created by attrs.
        """
        if other.__class__ is self.__class__:
            return attrs_to_tuple(self) >= attrs_to_tuple(other)

        return NotImplemented

    return __lt__, __le__, __gt__, __ge__


def _add_eq(cls, attrs=None):
    """
    Add equality methods to *cls* with *attrs*.
    """
    if attrs is None:
        attrs = cls.__attrs_attrs__

    cls.__eq__ = _make_eq(cls, attrs)
    cls.__ne__ = _make_ne()

    return cls


def _make_repr(attrs, ns, cls):
    unique_filename = _generate_unique_filename(cls, "repr")
    # Figure out which attributes to include, and which function to use to
    # format them. The a.repr value can be either bool or a custom
    # callable.
    attr_names_with_reprs = tuple(
        (a.name, (repr if a.repr is True else a.repr), a.init)
        for a in attrs
        if a.repr is not False
    )
    globs = {
        name + "_repr": r for name, r, _ in attr_names_with_reprs if r != repr
    }
    globs["_compat"] = _compat
    globs["AttributeError"] = AttributeError
    globs["NOTHING"] = NOTHING
    attribute_fragments = []
    for name, r, i in attr_names_with_reprs:
        accessor = (
            "self." + name if i else 'getattr(self, "' + name + '", NOTHING)'
        )
        fragment = (
            "%s={%s!r}" % (name, accessor)
            if r == repr
            else "%s={%s_repr(%s)}" % (name, name, accessor)
        )
        attribute_fragments.append(fragment)
    repr_fragment = ", ".join(attribute_fragments)

    if ns is None:
        cls_name_fragment = '{self.__class__.__qualname__.rsplit(">.", 1)[-1]}'
    else:
        cls_name_fragment = ns + ".{self.__class__.__name__}"

    lines = [
        "def __repr__(self):",
        "  try:",
        "    already_repring = _compat.repr_context.already_repring",
        "  except AttributeError:",
        "    already_repring = {id(self),}",
        "    _compat.repr_context.already_repring = already_repring",
        "  else:",
        "    if id(self) in already_repring:",
        "      return '...'",
        "    else:",
        "      already_repring.add(id(self))",
        "  try:",
        f"    return f'{cls_name_fragment}({repr_fragment})'",
        "  finally:",
        "    already_repring.remove(id(self))",
    ]

    return _make_method(
        "__repr__", "\n".join(lines), unique_filename, globs=globs
    )


def _add_repr(cls, ns=None, attrs=None):
    """
    Add a repr method to *cls*.
    """
    if attrs is None:
        attrs = cls.__attrs_attrs__

    cls.__repr__ = _make_repr(attrs, ns, cls)
    return cls


[docs] def fields(cls): """ Return the tuple of *attrs* attributes for a class. The tuple also allows accessing the fields by their names (see below for examples). Args: cls (type): Class to introspect. Raises: TypeError: If *cls* is not a class. attrs.exceptions.NotAnAttrsClassError: If *cls* is not an *attrs* class. Returns: tuple (with name accessors) of `attrs.Attribute` .. versionchanged:: 16.2.0 Returned tuple allows accessing the fields by name. .. versionchanged:: 23.1.0 Add support for generic classes. """ generic_base = get_generic_base(cls) if generic_base is None and not isinstance(cls, type): msg = "Passed object must be a class." raise TypeError(msg) attrs = getattr(cls, "__attrs_attrs__", None) if attrs is None: if generic_base is not None: attrs = getattr(generic_base, "__attrs_attrs__", None) if attrs is not None: # Even though this is global state, stick it on here to speed # it up. We rely on `cls` being cached for this to be # efficient. cls.__attrs_attrs__ = attrs return attrs msg = f"{cls!r} is not an attrs-decorated class." raise NotAnAttrsClassError(msg) return attrs
[docs] def fields_dict(cls): """ Return an ordered dictionary of *attrs* attributes for a class, whose keys are the attribute names. Args: cls (type): Class to introspect. Raises: TypeError: If *cls* is not a class. attrs.exceptions.NotAnAttrsClassError: If *cls* is not an *attrs* class. Returns: dict[str, attrs.Attribute]: Dict of attribute name to definition .. versionadded:: 18.1.0 """ if not isinstance(cls, type): msg = "Passed object must be a class." raise TypeError(msg) attrs = getattr(cls, "__attrs_attrs__", None) if attrs is None: msg = f"{cls!r} is not an attrs-decorated class." raise NotAnAttrsClassError(msg) return {a.name: a for a in attrs}
[docs] def validate(inst): """ Validate all attributes on *inst* that have a validator. Leaves all exceptions through. Args: inst: Instance of a class with *attrs* attributes. """ if _config._run_validators is False: return for a in fields(inst.__class__): v = a.validator if v is not None: v(inst, a, getattr(inst, a.name))
def _is_slot_attr(a_name, base_attr_map): """ Check if the attribute name comes from a slot class. """ cls = base_attr_map.get(a_name) return cls and "__slots__" in cls.__dict__ def _make_init( cls, attrs, pre_init, pre_init_has_args, post_init, frozen, slots, cache_hash, base_attr_map, is_exc, cls_on_setattr, attrs_init, ): has_cls_on_setattr = ( cls_on_setattr is not None and cls_on_setattr is not setters.NO_OP ) if frozen and has_cls_on_setattr: msg = "Frozen classes can't use on_setattr." raise ValueError(msg) needs_cached_setattr = cache_hash or frozen filtered_attrs = [] attr_dict = {} for a in attrs: if not a.init and a.default is NOTHING: continue filtered_attrs.append(a) attr_dict[a.name] = a if a.on_setattr is not None: if frozen is True: msg = "Frozen classes can't use on_setattr." raise ValueError(msg) needs_cached_setattr = True elif has_cls_on_setattr and a.on_setattr is not setters.NO_OP: needs_cached_setattr = True unique_filename = _generate_unique_filename(cls, "init") script, globs, annotations = _attrs_to_init_script( filtered_attrs, frozen, slots, pre_init, pre_init_has_args, post_init, cache_hash, base_attr_map, is_exc, needs_cached_setattr, has_cls_on_setattr, "__attrs_init__" if attrs_init else "__init__", ) if cls.__module__ in sys.modules: # This makes typing.get_type_hints(CLS.__init__) resolve string types. globs.update(sys.modules[cls.__module__].__dict__) globs.update({"NOTHING": NOTHING, "attr_dict": attr_dict}) if needs_cached_setattr: # Save the lookup overhead in __init__ if we need to circumvent # setattr hooks. globs["_cached_setattr_get"] = _OBJ_SETATTR.__get__ init = _make_method( "__attrs_init__" if attrs_init else "__init__", script, unique_filename, globs, ) init.__annotations__ = annotations return init def _setattr(attr_name: str, value_var: str, has_on_setattr: bool) -> str: """ Use the cached object.setattr to set *attr_name* to *value_var*. """ return f"_setattr('{attr_name}', {value_var})" def _setattr_with_converter( attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter ) -> str: """ Use the cached object.setattr to set *attr_name* to *value_var*, but run its converter first. """ return f"_setattr('{attr_name}', {converter._fmt_converter_call(attr_name, value_var)})" def _assign(attr_name: str, value: str, has_on_setattr: bool) -> str: """ Unless *attr_name* has an on_setattr hook, use normal assignment. Otherwise relegate to _setattr. """ if has_on_setattr: return _setattr(attr_name, value, True) return f"self.{attr_name} = {value}" def _assign_with_converter( attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter ) -> str: """ Unless *attr_name* has an on_setattr hook, use normal assignment after conversion. Otherwise relegate to _setattr_with_converter. """ if has_on_setattr: return _setattr_with_converter(attr_name, value_var, True, converter) return f"self.{attr_name} = {converter._fmt_converter_call(attr_name, value_var)}" def _determine_setters( frozen: bool, slots: bool, base_attr_map: dict[str, type] ): """ Determine the correct setter functions based on whether a class is frozen and/or slotted. """ if frozen is True: if slots is True: return (), _setattr, _setattr_with_converter # Dict frozen classes assign directly to __dict__. # But only if the attribute doesn't come from an ancestor slot # class. # Note _inst_dict will be used again below if cache_hash is True def fmt_setter( attr_name: str, value_var: str, has_on_setattr: bool ) -> str: if _is_slot_attr(attr_name, base_attr_map): return _setattr(attr_name, value_var, has_on_setattr) return f"_inst_dict['{attr_name}'] = {value_var}" def fmt_setter_with_converter( attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter, ) -> str: if has_on_setattr or _is_slot_attr(attr_name, base_attr_map): return _setattr_with_converter( attr_name, value_var, has_on_setattr, converter ) return f"_inst_dict['{attr_name}'] = {converter._fmt_converter_call(attr_name, value_var)}" return ( ("_inst_dict = self.__dict__",), fmt_setter, fmt_setter_with_converter, ) # Not frozen -- we can just assign directly. return (), _assign, _assign_with_converter def _attrs_to_init_script( attrs: list[Attribute], is_frozen: bool, is_slotted: bool, call_pre_init: bool, pre_init_has_args: bool, call_post_init: bool, does_cache_hash: bool, base_attr_map: dict[str, type], is_exc: bool, needs_cached_setattr: bool, has_cls_on_setattr: bool, method_name: str, ) -> tuple[str, dict, dict]: """ Return a script of an initializer for *attrs*, a dict of globals, and annotations for the initializer. The globals are required by the generated script. """ lines = ["self.__attrs_pre_init__()"] if call_pre_init else [] if needs_cached_setattr: lines.append( # Circumvent the __setattr__ descriptor to save one lookup per # assignment. Note _setattr will be used again below if # does_cache_hash is True. "_setattr = _cached_setattr_get(self)" ) extra_lines, fmt_setter, fmt_setter_with_converter = _determine_setters( is_frozen, is_slotted, base_attr_map ) lines.extend(extra_lines) args = [] kw_only_args = [] attrs_to_validate = [] # This is a dictionary of names to validator and converter callables. # Injecting this into __init__ globals lets us avoid lookups. names_for_globals = {} annotations = {"return": None} for a in attrs: if a.validator: attrs_to_validate.append(a) attr_name = a.name has_on_setattr = a.on_setattr is not None or ( a.on_setattr is not setters.NO_OP and has_cls_on_setattr ) # a.alias is set to maybe-mangled attr_name in _ClassBuilder if not # explicitly provided arg_name = a.alias has_factory = isinstance(a.default, Factory) maybe_self = "self" if has_factory and a.default.takes_self else "" if a.converter and not isinstance(a.converter, Converter): converter = Converter(a.converter) else: converter = a.converter if a.init is False: if has_factory: init_factory_name = _INIT_FACTORY_PAT % (a.name,) if converter is not None: lines.append( fmt_setter_with_converter( attr_name, init_factory_name + f"({maybe_self})", has_on_setattr, converter, ) ) names_for_globals[converter._get_global_name(a.name)] = ( converter.converter ) else: lines.append( fmt_setter( attr_name, init_factory_name + f"({maybe_self})", has_on_setattr, ) ) names_for_globals[init_factory_name] = a.default.factory elif converter is not None: lines.append( fmt_setter_with_converter( attr_name, f"attr_dict['{attr_name}'].default", has_on_setattr, converter, ) ) names_for_globals[converter._get_global_name(a.name)] = ( converter.converter ) else: lines.append( fmt_setter( attr_name, f"attr_dict['{attr_name}'].default", has_on_setattr, ) ) elif a.default is not NOTHING and not has_factory: arg = f"{arg_name}=attr_dict['{attr_name}'].default" if a.kw_only: kw_only_args.append(arg) else: args.append(arg) if converter is not None: lines.append( fmt_setter_with_converter( attr_name, arg_name, has_on_setattr, converter ) ) names_for_globals[converter._get_global_name(a.name)] = ( converter.converter ) else: lines.append(fmt_setter(attr_name, arg_name, has_on_setattr)) elif has_factory: arg = f"{arg_name}=NOTHING" if a.kw_only: kw_only_args.append(arg) else: args.append(arg) lines.append(f"if {arg_name} is not NOTHING:") init_factory_name = _INIT_FACTORY_PAT % (a.name,) if converter is not None: lines.append( " " + fmt_setter_with_converter( attr_name, arg_name, has_on_setattr, converter ) ) lines.append("else:") lines.append( " " + fmt_setter_with_converter( attr_name, init_factory_name + "(" + maybe_self + ")", has_on_setattr, converter, ) ) names_for_globals[converter._get_global_name(a.name)] = ( converter.converter ) else: lines.append( " " + fmt_setter(attr_name, arg_name, has_on_setattr) ) lines.append("else:") lines.append( " " + fmt_setter( attr_name, init_factory_name + "(" + maybe_self + ")", has_on_setattr, ) ) names_for_globals[init_factory_name] = a.default.factory else: if a.kw_only: kw_only_args.append(arg_name) else: args.append(arg_name) if converter is not None: lines.append( fmt_setter_with_converter( attr_name, arg_name, has_on_setattr, converter ) ) names_for_globals[converter._get_global_name(a.name)] = ( converter.converter ) else: lines.append(fmt_setter(attr_name, arg_name, has_on_setattr)) if a.init is True: if a.type is not None and converter is None: annotations[arg_name] = a.type elif converter is not None and converter._first_param_type: # Use the type from the converter if present. annotations[arg_name] = converter._first_param_type if attrs_to_validate: # we can skip this if there are no validators. names_for_globals["_config"] = _config lines.append("if _config._run_validators is True:") for a in attrs_to_validate: val_name = "__attr_validator_" + a.name attr_name = "__attr_" + a.name lines.append(f" {val_name}(self, {attr_name}, self.{a.name})") names_for_globals[val_name] = a.validator names_for_globals[attr_name] = a if call_post_init: lines.append("self.__attrs_post_init__()") # Because this is set only after __attrs_post_init__ is called, a crash # will result if post-init tries to access the hash code. This seemed # preferable to setting this beforehand, in which case alteration to field # values during post-init combined with post-init accessing the hash code # would result in silent bugs. if does_cache_hash: if is_frozen: if is_slotted: init_hash_cache = f"_setattr('{_HASH_CACHE_FIELD}', None)" else: init_hash_cache = f"_inst_dict['{_HASH_CACHE_FIELD}'] = None" else: init_hash_cache = f"self.{_HASH_CACHE_FIELD} = None" lines.append(init_hash_cache) # For exceptions we rely on BaseException.__init__ for proper # initialization. if is_exc: vals = ",".join(f"self.{a.name}" for a in attrs if a.init) lines.append(f"BaseException.__init__(self, {vals})") args = ", ".join(args) pre_init_args = args if kw_only_args: # leading comma & kw_only args args += f"{', ' if args else ''}*, {', '.join(kw_only_args)}" pre_init_kw_only_args = ", ".join( [ f"{kw_arg_name}={kw_arg_name}" # We need to remove the defaults from the kw_only_args. for kw_arg_name in (kwa.split("=")[0] for kwa in kw_only_args) ] ) pre_init_args += ", " if pre_init_args else "" pre_init_args += pre_init_kw_only_args if call_pre_init and pre_init_has_args: # If pre init method has arguments, pass same arguments as `__init__`. lines[0] = f"self.__attrs_pre_init__({pre_init_args})" # Python 3.7 doesn't allow backslashes in f strings. NL = "\n " return ( f"""def {method_name}(self, {args}): {NL.join(lines) if lines else 'pass'} """, names_for_globals, annotations, ) def _default_init_alias_for(name: str) -> str: """ The default __init__ parameter name for a field. This performs private-name adjustment via leading-unscore stripping, and is the default value of Attribute.alias if not provided. """ return name.lstrip("_")
[docs] class Attribute: """ *Read-only* representation of an attribute. .. warning:: You should never instantiate this class yourself. The class has *all* arguments of `attr.ib` (except for ``factory`` which is only syntactic sugar for ``default=Factory(...)`` plus the following: - ``name`` (`str`): The name of the attribute. - ``alias`` (`str`): The __init__ parameter name of the attribute, after any explicit overrides and default private-attribute-name handling. - ``inherited`` (`bool`): Whether or not that attribute has been inherited from a base class. - ``eq_key`` and ``order_key`` (`typing.Callable` or `None`): The callables that are used for comparing and ordering objects by this attribute, respectively. These are set by passing a callable to `attr.ib`'s ``eq``, ``order``, or ``cmp`` arguments. See also :ref:`comparison customization <custom-comparison>`. Instances of this class are frequently used for introspection purposes like: - `fields` returns a tuple of them. - Validators get them passed as the first argument. - The :ref:`field transformer <transform-fields>` hook receives a list of them. - The ``alias`` property exposes the __init__ parameter name of the field, with any overrides and default private-attribute handling applied. .. versionadded:: 20.1.0 *inherited* .. versionadded:: 20.1.0 *on_setattr* .. versionchanged:: 20.2.0 *inherited* is not taken into account for equality checks and hashing anymore. .. versionadded:: 21.1.0 *eq_key* and *order_key* .. versionadded:: 22.2.0 *alias* For the full version history of the fields, see `attr.ib`. """ __slots__ = ( "name", "default", "validator", "repr", "eq", "eq_key", "order", "order_key", "hash", "init", "metadata", "type", "converter", "kw_only", "inherited", "on_setattr", "alias", )
[docs] def __init__( self, name, default, validator, repr, cmp, # XXX: unused, remove along with other cmp code. hash, init, inherited, metadata=None, type=None, converter=None, kw_only=False, eq=None, eq_key=None, order=None, order_key=None, on_setattr=None, alias=None, ): eq, eq_key, order, order_key = _determine_attrib_eq_order( cmp, eq_key or eq, order_key or order, True ) # Cache this descriptor here to speed things up later. bound_setattr = _OBJ_SETATTR.__get__(self) # Despite the big red warning, people *do* instantiate `Attribute` # themselves. bound_setattr("name", name) bound_setattr("default", default) bound_setattr("validator", validator) bound_setattr("repr", repr) bound_setattr("eq", eq) bound_setattr("eq_key", eq_key) bound_setattr("order", order) bound_setattr("order_key", order_key) bound_setattr("hash", hash) bound_setattr("init", init) bound_setattr("converter", converter) bound_setattr( "metadata", ( types.MappingProxyType(dict(metadata)) # Shallow copy if metadata else _EMPTY_METADATA_SINGLETON ), ) bound_setattr("type", type) bound_setattr("kw_only", kw_only) bound_setattr("inherited", inherited) bound_setattr("on_setattr", on_setattr) bound_setattr("alias", alias)
def __setattr__(self, name, value): raise FrozenInstanceError()
[docs] @classmethod def from_counting_attr(cls, name, ca, type=None): # type holds the annotated value. deal with conflicts: if type is None: type = ca.type elif ca.type is not None: msg = "Type annotation and type argument cannot both be present" raise ValueError(msg) inst_dict = { k: getattr(ca, k) for k in Attribute.__slots__ if k not in ( "name", "validator", "default", "type", "inherited", ) # exclude methods and deprecated alias } return cls( name=name, validator=ca._validator, default=ca._default, type=type, cmp=None, inherited=False, **inst_dict, )
# Don't use attrs.evolve since fields(Attribute) doesn't work
[docs] def evolve(self, **changes): """ Copy *self* and apply *changes*. This works similarly to `attrs.evolve` but that function does not work with {class}`Attribute`. It is mainly meant to be used for `transform-fields`. .. versionadded:: 20.3.0 """ new = copy.copy(self) new._setattrs(changes.items()) return new
# Don't use _add_pickle since fields(Attribute) doesn't work
[docs] def __getstate__(self): """ Play nice with pickle. """ return tuple( getattr(self, name) if name != "metadata" else dict(self.metadata) for name in self.__slots__ )
[docs] def __setstate__(self, state): """ Play nice with pickle. """ self._setattrs(zip(self.__slots__, state))
def _setattrs(self, name_values_pairs): bound_setattr = _OBJ_SETATTR.__get__(self) for name, value in name_values_pairs: if name != "metadata": bound_setattr(name, value) else: bound_setattr( name, ( types.MappingProxyType(dict(value)) if value else _EMPTY_METADATA_SINGLETON ), )
_a = [ Attribute( name=name, default=NOTHING, validator=None, repr=True, cmp=None, eq=True, order=False, hash=(name != "metadata"), init=True, inherited=False, alias=_default_init_alias_for(name), ) for name in Attribute.__slots__ ] Attribute = _add_hash( _add_eq( _add_repr(Attribute, attrs=_a), attrs=[a for a in _a if a.name != "inherited"], ), attrs=[a for a in _a if a.hash and a.name != "inherited"], ) class _CountingAttr: """ Intermediate representation of attributes that uses a counter to preserve the order in which the attributes have been defined. *Internal* data structure of the attrs library. Running into is most likely the result of a bug like a forgotten `@attr.s` decorator. """ __slots__ = ( "counter", "_default", "repr", "eq", "eq_key", "order", "order_key", "hash", "init", "metadata", "_validator", "converter", "type", "kw_only", "on_setattr", "alias", ) __attrs_attrs__ = ( *tuple( Attribute( name=name, alias=_default_init_alias_for(name), default=NOTHING, validator=None, repr=True, cmp=None, hash=True, init=True, kw_only=False, eq=True, eq_key=None, order=False, order_key=None, inherited=False, on_setattr=None, ) for name in ( "counter", "_default", "repr", "eq", "order", "hash", "init", "on_setattr", "alias", ) ), Attribute( name="metadata", alias="metadata", default=None, validator=None, repr=True, cmp=None, hash=False, init=True, kw_only=False, eq=True, eq_key=None, order=False, order_key=None, inherited=False, on_setattr=None, ), ) cls_counter = 0 def __init__( self, default, validator, repr, cmp, hash, init, converter, metadata, type, kw_only, eq, eq_key, order, order_key, on_setattr, alias, ): _CountingAttr.cls_counter += 1 self.counter = _CountingAttr.cls_counter self._default = default self._validator = validator self.converter = converter self.repr = repr self.eq = eq self.eq_key = eq_key self.order = order self.order_key = order_key self.hash = hash self.init = init self.metadata = metadata self.type = type self.kw_only = kw_only self.on_setattr = on_setattr self.alias = alias def validator(self, meth): """ Decorator that adds *meth* to the list of validators. Returns *meth* unchanged. .. versionadded:: 17.1.0 """ if self._validator is None: self._validator = meth else: self._validator = and_(self._validator, meth) return meth def default(self, meth): """ Decorator that allows to set the default for an attribute. Returns *meth* unchanged. Raises: DefaultAlreadySetError: If default has been set before. .. versionadded:: 17.1.0 """ if self._default is not NOTHING: raise DefaultAlreadySetError() self._default = Factory(meth, takes_self=True) return meth _CountingAttr = _add_eq(_add_repr(_CountingAttr))
[docs] class Factory: """ Stores a factory callable. If passed as the default value to `attrs.field`, the factory is used to generate a new value. Args: factory (typing.Callable): A callable that takes either none or exactly one mandatory positional argument depending on *takes_self*. takes_self (bool): Pass the partially initialized instance that is being initialized as a positional argument. .. versionadded:: 17.1.0 *takes_self* """ __slots__ = ("factory", "takes_self")
[docs] def __init__(self, factory, takes_self=False): self.factory = factory self.takes_self = takes_self
[docs] def __getstate__(self): """ Play nice with pickle. """ return tuple(getattr(self, name) for name in self.__slots__)
[docs] def __setstate__(self, state): """ Play nice with pickle. """ for name, value in zip(self.__slots__, state): setattr(self, name, value)
_f = [ Attribute( name=name, default=NOTHING, validator=None, repr=True, cmp=None, eq=True, order=False, hash=True, init=True, inherited=False, ) for name in Factory.__slots__ ] Factory = _add_hash(_add_eq(_add_repr(Factory, attrs=_f), attrs=_f), attrs=_f)
[docs] class Converter: """ Stores a converter callable. Allows for the wrapped converter to take additional arguments. The arguments are passed in the order they are documented. Args: converter (Callable): A callable that converts the passed value. takes_self (bool): Pass the partially initialized instance that is being initialized as a positional argument. (default: `False`) takes_field (bool): Pass the field definition (an :class:`Attribute`) into the converter as a positional argument. (default: `False`) .. versionadded:: 24.1.0 """ __slots__ = ( "converter", "takes_self", "takes_field", "_first_param_type", "_global_name", "__call__", )
[docs] def __init__(self, converter, *, takes_self=False, takes_field=False): self.converter = converter self.takes_self = takes_self self.takes_field = takes_field ex = _AnnotationExtractor(converter) self._first_param_type = ex.get_first_param_type() if not (self.takes_self or self.takes_field): self.__call__ = lambda value, _, __: self.converter(value) elif self.takes_self and not self.takes_field: self.__call__ = lambda value, instance, __: self.converter( value, instance ) elif not self.takes_self and self.takes_field: self.__call__ = lambda value, __, field: self.converter( value, field ) else: self.__call__ = lambda value, instance, field: self.converter( value, instance, field ) rt = ex.get_return_type() if rt is not None: self.__call__.__annotations__["return"] = rt
@staticmethod def _get_global_name(attr_name: str) -> str: """ Return the name that a converter for an attribute name *attr_name* would have. """ return f"__attr_converter_{attr_name}" def _fmt_converter_call(self, attr_name: str, value_var: str) -> str: """ Return a string that calls the converter for an attribute name *attr_name* and the value in variable named *value_var* according to `self.takes_self` and `self.takes_field`. """ if not (self.takes_self or self.takes_field): return f"{self._get_global_name(attr_name)}({value_var})" if self.takes_self and self.takes_field: return f"{self._get_global_name(attr_name)}({value_var}, self, attr_dict['{attr_name}'])" if self.takes_self: return f"{self._get_global_name(attr_name)}({value_var}, self)" return f"{self._get_global_name(attr_name)}({value_var}, attr_dict['{attr_name}'])"
[docs] def __getstate__(self): """ Return a dict containing only converter and takes_self -- the rest gets computed when loading. """ return { "converter": self.converter, "takes_self": self.takes_self, "takes_field": self.takes_field, }
[docs] def __setstate__(self, state): """ Load instance from state. """ self.__init__(**state)
_f = [ Attribute( name=name, default=NOTHING, validator=None, repr=True, cmp=None, eq=True, order=False, hash=True, init=True, inherited=False, ) for name in ("converter", "takes_self", "takes_field") ] Converter = _add_hash( _add_eq(_add_repr(Converter, attrs=_f), attrs=_f), attrs=_f )
[docs] def make_class( name, attrs, bases=(object,), class_body=None, **attributes_arguments ): r""" A quick way to create a new class called *name* with *attrs*. Args: name (str): The name for the new class. attrs( list | dict): A list of names or a dictionary of mappings of names to `attr.ib`\ s / `attrs.field`\ s. The order is deduced from the order of the names or attributes inside *attrs*. Otherwise the order of the definition of the attributes is used. bases (tuple[type, ...]): Classes that the new class will subclass. class_body (dict): An optional dictionary of class attributes for the new class. attributes_arguments: Passed unmodified to `attr.s`. Returns: type: A new class with *attrs*. .. versionadded:: 17.1.0 *bases* .. versionchanged:: 18.1.0 If *attrs* is ordered, the order is retained. .. versionchanged:: 23.2.0 *class_body* """ if isinstance(attrs, dict): cls_dict = attrs elif isinstance(attrs, (list, tuple)): cls_dict = {a: attrib() for a in attrs} else: msg = "attrs argument must be a dict or a list." raise TypeError(msg) pre_init = cls_dict.pop("__attrs_pre_init__", None) post_init = cls_dict.pop("__attrs_post_init__", None) user_init = cls_dict.pop("__init__", None) body = {} if class_body is not None: body.update(class_body) if pre_init is not None: body["__attrs_pre_init__"] = pre_init if post_init is not None: body["__attrs_post_init__"] = post_init if user_init is not None: body["__init__"] = user_init type_ = types.new_class(name, bases, {}, lambda ns: ns.update(body)) # For pickling to work, the __module__ variable needs to be set to the # frame where the class is created. Bypass this step in environments where # sys._getframe is not defined (Jython for example) or sys._getframe is not # defined for arguments greater than 0 (IronPython). with contextlib.suppress(AttributeError, ValueError): type_.__module__ = sys._getframe(1).f_globals.get( "__name__", "__main__" ) # We do it here for proper warnings with meaningful stacklevel. cmp = attributes_arguments.pop("cmp", None) ( attributes_arguments["eq"], attributes_arguments["order"], ) = _determine_attrs_eq_order( cmp, attributes_arguments.get("eq"), attributes_arguments.get("order"), True, ) cls = _attrs(these=cls_dict, **attributes_arguments)(type_) # Only add type annotations now or "_attrs()" will complain: cls.__annotations__ = { k: v.type for k, v in cls_dict.items() if v.type is not None } return cls
# These are required by within this module so we define them here and merely # import into .validators / .converters. @attrs(slots=True, unsafe_hash=True) class _AndValidator: """ Compose many validators to a single one. """ _validators = attrib() def __call__(self, inst, attr, value): for v in self._validators: v(inst, attr, value) def and_(*validators): """ A validator that composes multiple validators into one. When called on a value, it runs all wrapped validators. Args: validators (~collections.abc.Iterable[typing.Callable]): Arbitrary number of validators. .. versionadded:: 17.1.0 """ vals = [] for validator in validators: vals.extend( validator._validators if isinstance(validator, _AndValidator) else [validator] ) return _AndValidator(tuple(vals)) def pipe(*converters): """ A converter that composes multiple converters into one. When called on a value, it runs all wrapped converters, returning the *last* value. Type annotations will be inferred from the wrapped converters', if they have any. converters (~collections.abc.Iterable[typing.Callable]): Arbitrary number of converters. .. versionadded:: 20.1.0 """ def pipe_converter(val, inst, field): for c in converters: val = c(val, inst, field) if isinstance(c, Converter) else c(val) return val if not converters: # If the converter list is empty, pipe_converter is the identity. A = typing.TypeVar("A") pipe_converter.__annotations__.update({"val": A, "return": A}) else: # Get parameter type from first converter. t = _AnnotationExtractor(converters[0]).get_first_param_type() if t: pipe_converter.__annotations__["val"] = t last = converters[-1] if not PY_3_11_PLUS and isinstance(last, Converter): last = last.__call__ # Get return type from last converter. rt = _AnnotationExtractor(last).get_return_type() if rt: pipe_converter.__annotations__["return"] = rt return Converter(pipe_converter, takes_self=True, takes_field=True)