Source code for photonforge.models.data

import io
import struct
import warnings
from collections.abc import Sequence
from typing import Any, Literal

import numpy

from .. import typing as pft
from ..cache import cache_s_matrix
from ..extension import (
    Component,
    Interpolator,
    Model,
    Path,
    Port,
    PortSpec,
    Rectangle,
    SMatrix,
    Technology,
    _from_bytes,
    config,
    frequency_classification,
    pole_residue_fit,
    register_model_class,
)
from .analytic import _add_bb_text, _bb_layer

InterpolationMethod = Literal["linear", "barycentric", "cubicspline", "pchip", "akima", "makima"]
InterpolationCoords = Literal["real_imag", "mag_phase"]

SMatrixElements = dict[tuple[str, str], numpy.ndarray]


def _s_matrix_elements(
    s_array: numpy.ndarray, keys: dict[tuple[str, str], int] | None
) -> SMatrixElements:
    if keys is None:
        return s_array
    return {key: s_array[:, index] for key, index in keys.items()}


[docs] class DataModel(Model): """Model based on existing S matrix data. Args: s_matrix: Model data as an :class:`SMatrix` instance. s_array: Complex array with dimensions ``(F, N, N)``, in which ``N`` is the number of ports. frequencies: Frequency array with length ``F``. ports: List of port names. If not set, the *sorted* list of port components is used. interpolation_method: Interpolation method used for sampling frequencies. See table below for options. interpolation_coords: Coordinate system used for interpolation. One of ``"mag_phase"`` or ``"real_imag"``. Not used for ``"poleresidue"`` interpolation. poleresidue_kwargs: Keyword arguments to :func:`pole_residue_fit` used when using ``"poleresidue"`` interpolation. When ``s_matrix`` is provided, ``s_array``, ``frequencies``, and ``ports`` should be ``None``, otherwise only ``ports`` is optional. ===================== ================================================ Interpolation method Description ===================== ================================================ ``"linear"`` Linear interpolation between neighboring points ``"barycentric"`` Barycentric Lagrange interpolation ``"cubicspline"`` Cubic spline interpolation ``"pchip"`` Piecewise cubic Hermite interpolating polynomial ``"akima"`` Akima interpolation ``"makima"`` Modified Akima interpolation ``"poleresidue"`` Pole-residue fitting ===================== ================================================ Note: The conversion from array to dictionary for ``s_data`` is equivalent to ``s_dict[(ports[i], ports[j])] = s_array[:, j, i]``. See also: `Data Model guide <../guides/Data_Model.ipynb>`__ """ def __init__( self, s_matrix: SMatrix | None = None, s_array: pft.array(complex, 3) | None = None, frequencies: Sequence[pft.Frequency] | None = None, ports: Sequence[str] | None = None, interpolation_method: InterpolationMethod | Literal["poleresidue"] = "linear", interpolation_coords: InterpolationCoords = "mag_phase", poleresidue_kwargs: pft.kwargs_for(pole_residue_fit) = {}, ): super().__init__( s_matrix=s_matrix, s_array=s_array, frequencies=frequencies, ports=ports, interpolation_method=interpolation_method, interpolation_coords=interpolation_coords, poleresidue_kwargs=poleresidue_kwargs, ) if ( interpolation_method != "poleresidue" and interpolation_method not in InterpolationMethod.__args__ ): raise ValueError( "'interpolation_method' must be one of '" + "', '".join(InterpolationMethod.__args__) + "', 'poleresidue'." ) if interpolation_coords not in InterpolationCoords.__args__: raise ValueError( "'interpolation_coords' must be one of '" + "', '".join(InterpolationCoords.__args__) + "'." ) self.interpolation_method = interpolation_method self.interpolation_coords = interpolation_coords self.poleresidue_kwargs = poleresidue_kwargs self._poleresidue_matrix = None if s_matrix is None and (s_array is None or frequencies is None): raise RuntimeError( "Please provide either 's_matrix' or both 's_array' and 'frequencies'." ) if s_matrix is not None: if ports is not None: warnings.warn( "Argument 'ports' is ignored when 's_matrix' is provided. Using names from " "'s_matrix.ports' instead.", stacklevel=2, ) if s_array is not None: warnings.warn( "Argument 's_array' is ignored when 's_matrix' is provided.", stacklevel=2 ) if frequencies is not None: warnings.warn( "Argument 'frequencies' is ignored when 's_matrix' is provided.", stacklevel=2 ) self.frequencies = s_matrix.frequencies self.ports = sorted(s_matrix.ports) elements = s_matrix.elements sorted_keys = sorted(elements.keys()) self.keys = {k: i for i, k in enumerate(sorted_keys)} self.dimension = 0 self.s_array = numpy.array([elements[k] for k in sorted_keys], dtype=complex) else: self.frequencies = numpy.array(frequencies, dtype=float, ndmin=1) self.ports = ports self.keys = None s_array = numpy.array(s_array, dtype=complex) shape = s_array.shape if len(shape) != 3 or shape[1] != shape[2] or shape[0] != self.frequencies.size: raise RuntimeError( "S matrix must be of shape (F, N, N), with F being the length of frequencies." ) if ports is not None and len(ports) != s_array.shape[1]: raise RuntimeError( "The number of port names must match the S matrix dimension " f"({self.s_array.shape[0]})." ) self.dimension = shape[1] self.s_array = s_array.reshape((shape[0], shape[1] ** 2)).T self._interpolator = ( None if interpolation_method == "poleresidue" else Interpolator( self.frequencies, self.s_array, interpolation_method, interpolation_coords ) )
[docs] def black_box_component( self, port_spec: str | PortSpec | Sequence[str | PortSpec] | None = None, technology: Technology | None = None, name: str | None = None, ) -> Component: """Create a black-box component using this model for testing. Args: port_spec: Port specification used in the component. If ``None``, look for ``"port_spec"`` in :attr:`config.default_kwargs`. technology: Component technology. If ``None``, the default technology is used. name: Component name. If ``None`` a default is used. Returns: Component with ports and model. """ model_name = self.__class__.__name__[:-5] component = Component(f"BB{model_name}" if name is None else name, technology=technology) if port_spec is None: port_spec = config.default_kwargs.get("port_spec") if port_spec is None: raise RuntimeError("Missing argument 'port_spec'.") if isinstance(port_spec, str): name = port_spec port_spec = component.technology.ports.get(name) if port_spec is None: raise RuntimeError(f"Port spec '{name}' not found in component's technology.") if not isinstance(port_spec, PortSpec): port_spec = list(port_spec) num_ports = len(port_spec) for i in range(len(port_spec)): if isinstance(port_spec[i], str): name = port_spec[i] port_spec[i] = component.technology.ports.get(name) if port_spec[i] is None: raise RuntimeError( f"Port spec '{name}' not found in component's technology." ) if self.keys is None: if isinstance(port_spec, PortSpec): num_ports = self.dimension // port_spec.num_modes if num_ports * port_spec.num_modes != self.dimension: raise RuntimeError( f"The number of modes of 'port_spec' ({port_spec.num_modes}) is not a " f"divisor of the S array dimension ({self.dimension}). Cannot calculate " f"the number of ports." ) port_spec = [port_spec] * num_ports elif sum(s.num_modes for s in port_spec) != self.dimension: raise RuntimeError( f"The total number of modes in the 'port_spec' sequence" f"({sum(s.num_modes for s in port_spec)}) does not match the S array dimension " f"({self.dimension})." ) elif isinstance(port_spec, PortSpec): num_ports = len(self.ports) port_spec = [port_spec] * num_ports width = max(s.width for s in port_spec) length = width * 8 port_names = self.ports or [None] * num_ports num_ports = (num_ports // 2, num_ports - num_ports // 2) port_spec = (port_spec[: num_ports[0]], port_spec[num_ports[0] :]) port_names = (port_names[: num_ports[0]], port_names[num_ports[0] :]) i0 = 0.5 * (num_ports[0] - 1) i1 = 0.5 * (num_ports[1] - 1) pos = ( [(0, 1.5 * width * (i - i0)) for i in range(num_ports[0])], [(length, 1.5 * width * (i - i1)) for i in range(num_ports[1])], ) for p, s, n in zip(pos[0], port_spec[0], port_names[0], strict=False): profiles = s.path_profiles_list() if len(profiles) == 0: profiles = [(width, 0, _bb_layer)] for w, g, layer in profiles: component.add(layer, Path(p, w, g).segment((0.25 * length, p[1]))) component.add_port(Port(p, 0, s), n) for p, s, n in zip(pos[1], port_spec[1], port_names[1], strict=False): profiles = s.path_profiles_list() if len(profiles) == 0: profiles = [(width, 0, _bb_layer)] for w, g, layer in profiles: component.add(layer, Path(p, w, g).segment((0.75 * length, p[1]))) component.add_port(Port(p, 180, s), n) (_, y_min), (_, y_max) = component.bounds() component.add(_bb_layer, Rectangle((0.25 * length, y_min), (0.75 * length, y_max))) _add_bb_text(component, width) component.add_model(self, model_name) return component
[docs] @cache_s_matrix def start(self, component: Component, frequencies: Sequence[float], **kwargs: Any) -> SMatrix: """Start computing the S matrix response from a component. Args: component: Component from which to compute the S matrix. frequencies: Sequence of frequencies at which to perform the computation. **kwargs: Unused. Returns: Model result with attributes ``status`` and ``s_matrix``. """ frequencies = numpy.array(frequencies, dtype=float, ndmin=1) classification = frequency_classification(frequencies) component_ports = { name: port.copy(True) for name, port in component.select_ports(classification).items() } if self.ports is None: port_names = sorted(component_ports) else: port_names = self.ports if not all(name in component_ports for name in port_names): raise RuntimeError( f"Not all port names defined in DataModel match the {classification} port " f"names in component '{component.name}'." ) s_array = ( self.s_array if self.interpolation_method == "poleresidue" else self._interpolator(frequencies) ) if self.keys is None: ports = tuple( f"{name}@{mode}" for name in port_names for mode in range(component_ports[name].num_modes) ) if len(ports) != self.dimension: raise RuntimeError( f"DataModel S matrix has dimension {self.dimension}, but component " f"'{component.name}' has {len(ports)} ports/modes." ) elements = { (port_in, port_out): numpy.copy(s_array[self.dimension * j + i]) for i, port_in in enumerate(ports) for j, port_out in enumerate(ports) } else: elements = {} for port_in in port_names: for port_out in port_names: for mode_in in range(component_ports[port_in].num_modes): for mode_out in range(component_ports[port_out].num_modes): key = (f"{port_in}@{mode_in}", f"{port_out}@{mode_out}") index = self.keys.get(key) if index is not None: elements[key] = numpy.copy(s_array[index]) if self.interpolation_method == "poleresidue": if ( self._poleresidue_matrix is None or self._poleresidue_matrix.ports != component_ports ): s_matrix = SMatrix(self.frequencies, elements, component_ports) self._poleresidue_matrix, _ = pole_residue_fit(s_matrix, **self.poleresidue_kwargs) s_matrix = self._poleresidue_matrix(frequencies) else: s_matrix = SMatrix(frequencies, elements, component_ports) return s_matrix
# Deprecated: kept for backwards compatibility with old phf files
[docs] @classmethod def from_bytes(cls, byte_repr: bytes) -> "DataModel": """De-serialize this model.""" version = byte_repr[0] if version == 1: obj = dict(_from_bytes(byte_repr[1:])) keys = obj.pop("keys") if keys is not None: elements = _s_matrix_elements(numpy.array(obj.pop("s_array")), keys) ports = dict.fromkeys(obj.pop("ports")) s_matrix = SMatrix(obj.pop("frequencies"), elements, ports) obj["s_matrix"] = s_matrix elif version == 0: head_size = 1 + struct.calcsize("<2Q") keys_len, ports_len = struct.unpack("<2Q", byte_repr[1:head_size]) num_parts = 2 * keys_len + ports_len + 4 lengths_size = struct.calcsize(f"<{num_parts}Q") lengths = struct.unpack( f"<{num_parts}Q", byte_repr[head_size : head_size + lengths_size] ) cursor = head_size + lengths_size if cursor + sum(lengths) != len(byte_repr): raise RuntimeError("Invalid byte representation for DataModel.") keys = None if keys_len == 0 else {} ports = None if ports_len == 0 else [] for _ in range(keys_len): p0 = byte_repr[cursor : cursor + lengths[0]].decode("utf-8") cursor += lengths[0] p1 = byte_repr[cursor : cursor + lengths[1]].decode("utf-8") cursor += lengths[1] keys[(p0, p1)] = len(keys) lengths = lengths[2:] for _ in range(ports_len): ports.append(byte_repr[cursor : cursor + lengths[0]].decode("utf-8")) cursor += lengths[0] lengths = lengths[1:] mem_io = io.BytesIO() mem_io.write(byte_repr[cursor : cursor + lengths[0]]) mem_io.seek(0) frequencies = numpy.load(mem_io) cursor += lengths[0] mem_io = io.BytesIO() mem_io.write(byte_repr[cursor : cursor + lengths[1]]) mem_io.seek(0) s_array = numpy.load(mem_io) cursor += lengths[1] interpolation_method = byte_repr[cursor : cursor + lengths[2]].decode("utf-8") cursor += lengths[2] interpolation_coords = byte_repr[cursor : cursor + lengths[3]].decode("utf-8") cursor += lengths[3] if keys is not None: elements = _s_matrix_elements(s_array, keys) s_matrix = SMatrix(frequencies, elements, dict.fromkeys(ports)) obj = { "s_matrix": s_matrix, "interpolation_method": interpolation_method, "interpolation_coords": interpolation_coords, } else: obj = { "frequencies": frequencies, "s_array": s_array, "ports": ports, "interpolation_method": interpolation_method, "interpolation_coords": interpolation_coords, } else: raise RuntimeError("Unsuported DataModel version.") return cls(**obj)
register_model_class(DataModel)