Source code for photonforge.models.circuit

import struct
import threading
import time
from collections.abc import Sequence
from typing import Any, Literal

import numpy
from scipy.optimize import minimize

from ..cache import _mode_overlap_cache, cache_s_matrix
from ..circuit_base import _process_component_netlist
from ..extension import (
    Component,
    Model,
    Path,
    Port,
    SMatrix,
    _connect_s_matrices,
    config,
    frequency_classification,
    register_model_class,
)
from ..typing import PositiveFloat
from ..utils import _align_and_overlap, _gather_status
from .tidy3d import Tidy3DModel
from .waveguide import WaveguideModel


class _CircuitModelRunner:
    def __init__(
        self,
        runners: dict[object, object],
        frequencies: Sequence[float],
        component_name: str,
        ports: dict[str, Port],
        port_connections: dict[str, tuple[int, str, int]],
        connections: Sequence[tuple[tuple[int, str, int], tuple[int, str, int]]],
        instance_port_data: Sequence[tuple[object, object]],
        cost_estimation: bool,
    ) -> None:
        self.runners = runners
        self.frequencies = frequencies
        self.component_name = component_name
        self.ports = ports
        self.port_connections = port_connections
        self.connections = connections
        self.instance_port_data = instance_port_data
        self.cost_estimation = cost_estimation

        self.lock = threading.Lock()
        self._s_matrix = None
        self._status = {"progress": 0, "message": "running", "tasks": {}}

        self.thread = threading.Thread(daemon=True, target=self._run_and_monitor_task)
        self.thread.start()

    def _run_and_monitor_task(self) -> None:
        task_status = _gather_status(*self.runners.values())
        w_tasks = 3 * len(task_status["tasks"])
        n_ports = len(self.instance_port_data)
        n_connections = len(self.connections)
        denominator = max(1, w_tasks + n_ports + n_connections)

        with self.lock:
            self._status = dict(task_status)
            self._status["progress"] *= w_tasks / denominator

        while task_status["message"] == "running":
            time.sleep(0.3)
            task_status = _gather_status(*self.runners.values())
            with self.lock:
                self._status = dict(task_status)
                self._status["progress"] *= w_tasks / denominator

        if task_status["message"] == "error":
            with self.lock:
                self._status = task_status
            return

        with self.lock:
            self._status = task_status
            if self.cost_estimation:
                return
            self._status["message"] = "running"
            self._status["progress"] *= w_tasks / denominator

        s_dict = {}
        for index, (instance_ports, instance_keys) in enumerate(self.instance_port_data):
            # Check if reference is needed
            if instance_ports is None:
                continue

            runner = self.runners[index]
            s_matrix = runner if isinstance(runner, SMatrix) else runner.s_matrix
            if s_matrix is None:
                with self.lock:
                    self._status["message"] = "error"
                return

            # Fix port phases if a rotation is applied
            mode_factor = {}
            if instance_keys is not None:
                for port_name, port in instance_ports:
                    key = instance_keys.get(port_name)
                    if key is None:
                        continue

                    # Port mode
                    overlap = _mode_overlap_cache[key]
                    if overlap is None:
                        overlap = _align_and_overlap(
                            self.runners[(index, port_name, 0)].data,
                            self.runners[(index, port_name, 1)].data,
                        )[0]
                        _mode_overlap_cache[key] = overlap

                    for mode in range(port.num_modes):
                        mode_factor[f"{port_name}@{mode}"] = overlap[mode]

            for (i, j), s_ji in s_matrix.elements.items():
                s_dict[(index, i), (index, j)] = (
                    s_ji * mode_factor.get(i, 1.0) / mode_factor.get(j, 1.0)
                )

            with self.lock:
                self._status["progress"] = 100 * (w_tasks + index + 1) / denominator

        s_dict = _connect_s_matrices(s_dict, self.connections, len(self.instance_port_data))

        # Build S matrix with desired ports
        ports = {
            (index, f"{ref_name}@{n}"): f"{port_name}@{n}"
            for (index, ref_name, modes), port_name in self.port_connections.items()
            for n in range(modes)
        }

        elements = {
            (ports[i], ports[j]): s_ji
            for (i, j), s_ji in s_dict.items()
            if i in ports and j in ports
        }

        with self.lock:
            self._s_matrix = SMatrix(self.frequencies, elements, self.ports)
            self._status["progress"] = 100
            self._status["message"] = "success"

    @property
    def status(self) -> dict[str, object]:
        with self.lock:
            return self._status

    @property
    def s_matrix(self) -> SMatrix:
        with self.lock:
            return self._s_matrix


[docs] class CircuitModel(Model): """Model based on circuit-level S-parameter calculation. The component is expected to be composed of interconnected references. Scattering parameters are computed based on the S matrices from all references and their interconnections. The S matrix of each reference is calculated based on the active model of the reference's component. Each calculation is preceded by an update to the componoent's technology, the component itself, and its active model by calling :attr:`Reference.update`. They are reset to their original state after the :func:`CircuitModel.start` function is called. Keyword arguents in :attr:`Reference.s_matrix_kwargs` will be passed on to :func:`CircuitModel.start`. If a reference includes repetitions, it is flattened so that each instance is called separately. Connections between incompatible ports (butt couplings) are handled automatically. For electrical ports, if either one has impedance information, the coupling matrix is approximated by reflection and transmission coefficients derived from both port impedances. For optical ports and electrical ports without impedance information, an :class:`EMEModel` is used to calculate the butt coupling coefficients. Args: mesh_refinement: Minimal number of mesh elements per wavelength used for mode solving. verbose: Flag setting the verbosity of mode solver runs. See also: - :func:`Component.get_netlist` - :attr:`PortSpec.impedance` - `Circuit Model guide <../guides/Circuit_Model.ipynb>`__ """ def __init__( self, mesh_refinement: PositiveFloat | None = None, verbose: bool = True, ) -> None: super().__init__(mesh_refinement=mesh_refinement, verbose=verbose) @property def mesh_refinement(self) -> PositiveFloat | None: return self.parametric_kwargs["mesh_refinement"] @mesh_refinement.setter def mesh_refinement(self, value: PositiveFloat | None) -> None: self.parametric_kwargs["mesh_refinement"] = value @property def verbose(self) -> bool: return self.parametric_kwargs["verbose"] @verbose.setter def verbose(self, value: bool) -> None: self.parametric_kwargs["verbose"] = value
[docs] @cache_s_matrix def start( self, component: Component, frequencies: Sequence[float], updates: dict[Sequence[str | int | None], dict[str, dict[str, Any]]] | None = None, chain_technology_updates: bool = True, verbose: bool | None = None, cost_estimation: bool = False, **kwargs: object, ) -> _CircuitModelRunner: """Start computing the S matrix response from a component. Args: component: Component from which to compute the S matrix. frequencies: Sequence of frequencies at which to perform the computation. updates: Dictionary of parameter updates to be applied to components, technologies, and models for references within the main component. See below for further information. chain_technology_updates: if set, a technology update will trigger an update for all components using that technology. verbose: If set, overrides the model's ``verbose`` attribute and is passed to reference models. cost_estimation: If set, Tidy3D simulations are uploaded, but not executed. S matrix will *not* be computed. **kwargs: Keyword arguments passed to reference models. Returns: Result object with attributes ``status`` and ``s_matrix``. The ``'updates'`` dictionary contains keyword arguments for the :func:`Reference.update` function for the references in the component dependency tree, such that, when the S parameter of a specific reference are computed, that reference can be updated without affecting others using the same component. Each key in the dictionary is used as a reference specification. It must be a tuple with any number of the following: - ``name: str | re.Pattern``: selects any reference whose component name matches the given regex. - ``i: int``, directly following ``name``: limits the selection to ``reference[i]`` from the list of references matching the name. A negative value will match all list items. Note that each repetiton in a reference array counts as a single element in the list. - ``None``: matches any reference at any depth. Examples: >>> updates = { ... # Apply component updates to the first "ARM" reference in ... # the main component ... ("ARM", 0): {"component_updates": {"radius": 10}} ... # Apply model updates to the second "BEND" reference under ... # any "SUB" references in the main component ... ("SUB", "BEND", 1): {"model_updates": {"verbose": False}} ... # Apply technology updates to references with component name ... # starting with "COMP_" prefix, at any subcomponent depth ... (None, "COMP.*"): {"technology_updates": {"thickness": 0.3}} ... } >>> s_matrix = component.s_matrix( ... frequencies, model_kwargs={"updates": updates} ... ) See also: - `Circuit Model guide <../guides/Circuit_Model.ipynb>`__ - `Cascaded Rings Filter example <../examples/Cascaded_Rings_Filter.ipynb>`__ """ if updates is None: updates = {} if verbose is None: verbose = self.verbose s_matrix_kwargs = {} else: s_matrix_kwargs = {"verbose": verbose} if cost_estimation: s_matrix_kwargs["cost_estimation"] = cost_estimation frequencies = numpy.array(frequencies, dtype=float, ndmin=1) runners, _, component_ports, port_connections, connections, instance_port_data, _ = ( _process_component_netlist( component, frequencies, self.mesh_refinement, {}, updates, chain_technology_updates, verbose, cost_estimation, kwargs, s_matrix_kwargs, None, ) ) return _CircuitModelRunner( runners, frequencies, component.name, component_ports, port_connections, connections, instance_port_data, cost_estimation, )
# Deprecated: kept for backwards compatibility with old phf files
[docs] @classmethod def from_bytes(cls, byte_repr: bytes) -> "CircuitModel": """De-serialize this model.""" (version, verbose, mesh_refinement) = struct.unpack("<B?d", byte_repr) if version != 0: raise RuntimeError("Unsuported CircuitModel version.") if mesh_refinement <= 0: mesh_refinement = None return cls(mesh_refinement, verbose)
def _subpath_length(path: Path, u0: float, u1: float, npts: int) -> float: vecs = numpy.diff([path.at(u, output="position") for u in numpy.linspace(u0, u1, npts)], axis=0) return ((vecs[:, 0] ** 2 + vecs[:, 1] ** 2) ** 0.5).sum() _coupler_model = Tidy3DModel() _arms_model = Tidy3DModel() _circuit_model = CircuitModel()
[docs] class DirectionalCouplerCircuitModel(Model): """Model for directional couplers based on circuit decomposition. The coupler is subdivided into 5 parts: the 4 arms and the coupling region. Each is simulated separately, and the final S parameters are computed by a :class:`CircuitModel`. The component geometry is expected to be composed of at least 2 paths connecting 2 ports each. The paths must start far apart, come close together in the coupling region, and separate again. The distance between paths is used to determine the extents of the arms and coupling region. Args: coupling_region_model: Model used to compute the S parameters in the coupling region. arms_model: Model used for the arms. A dictionary mapping port names to models can be used to set a specific models for each arm. circuit_model: Model used to merge all component parts. """ def __init__( self, *, coupling_region_model: Model = _coupler_model, arms_model: Model | dict[str, Model] = _arms_model, circuit_model: Model = _circuit_model, ) -> None: super().__init__( coupling_region_model=coupling_region_model, arms_model=arms_model, circuit_model=circuit_model, )
[docs] def get_circuit( self, component: Component, classification: Literal["optical", "electrical"] = "optical" ) -> Component: """Build the circuit equivalent for a component. Args: component: Component to subdivide into arms and coupling region. classification: Frequency classification to use. Returns: Subdivided component. """ ports = sorted(component.select_ports(classification).items(), key=lambda x: x[1].center[0]) if len(ports) != 4: raise RuntimeError( f"Component {component.name!r} must have 4 {classification} ports to use " f"'DirectionalCouplerCircuitModel'." ) if not all(isinstance(port, Port) for _, port in ports): raise RuntimeError(f"All ports in {component.name!r} must be 'Port' instances.") structures = component.structures # Expect the following port configuration: # P1 _ _ P3 # \_/ # P0 ----- P2 (n0, p0), (n1, p1), (n2, p2), (n3, p3) = ports if p0.center[1] > p1.center[1]: p0, p1 = p1, p0 n0, n1 = n1, n0 path_profiles = p0.spec.path_profiles_list() if len(path_profiles) == 0: raise RuntimeError( "Path profiles must not be empty to use an 'DirectionalCouplerCircuitModel'." ) layer0 = path_profiles[0][2] # Find path0 that goes from p0 to p2 path0 = None for s in structures[layer0]: if not isinstance(s, Path): continue start = s.at(0, output="position") end = s.at(s.size, output="position") if numpy.allclose(p0.center, start): if numpy.allclose(p2.center, end): path0 = s inverted0 = False break elif numpy.allclose(p1.center, end): p1, p2 = p2, p1 n1, n2 = n2, n1 path0 = s inverted0 = False break elif numpy.allclose(p3.center, end): p3, p2 = p2, p3 n3, n2 = n2, n3 path0 = s inverted0 = False break elif numpy.allclose(p0.center, end): if numpy.allclose(p2.center, start): path0 = s inverted0 = True break elif numpy.allclose(p1.center, start): p1, p2 = p2, p1 n1, n2 = n2, n1 path0 = s inverted0 = True break elif numpy.allclose(p3.center, start): p3, p2 = p2, p3 n3, n2 = n2, n3 path0 = s inverted0 = True break if path0 is None: raise RuntimeError( f"Unable to find a path in layer {layer0} conneting port {n0!r} to any other port " f"in component {component.name!r}." ) v_axis = p2.center - p0.center axis = 0 if abs(v_axis[0]) >= abs(v_axis[1]) else 1 if p1.center[axis] > p3.center[axis]: p3, p1 = p1, p3 n3, n1 = n1, n3 # Find path1 from p3 to p1 path_profiles = p3.spec.path_profiles_list() if len(path_profiles) == 0: raise RuntimeError( "Path profiles must not be empty to use an 'DirectionalCouplerCircuitModel'." ) layer1 = path_profiles[0][2] path1 = None for s in structures[layer1]: if not isinstance(s, Path): continue start = s.at(0, output="position") end = s.at(s.size, output="position") if numpy.allclose(p3.center, start) and numpy.allclose(p1.center, end): path1 = s inverted1 = False break elif numpy.allclose(p1.center, start) and numpy.allclose(p3.center, end): path1 = s inverted1 = True break if path1 is None: raise RuntimeError( f"Unable to find a path in layer {layer1} conneting ports {n1!r} and {n3!r} in " f"component {component.name!r}." ) offset0 = p0.spec.width * 2**-0.5 offset1 = p3.spec.width * 2**-0.5 if inverted0: offset0 = -offset0 if inverted1: offset1 = -offset1 v_axis = 0.5 * (p2.center - p0.center + p3.center - p1.center) v_norm = 0.5 * (p1.center - p0.center + p3.center - p2.center) if v_axis[0] * v_norm[1] - v_axis[1] * v_norm[0] < 0: # if the normal is to the right of the propagation axis, invert the offsets offset0 = -offset0 offset1 = -offset1 size0 = path0.size npts0 = path0.spine().shape[0] size1 = path1.size npts1 = path1.spine().shape[0] # Find crossings between path0 and path1 with respective offsets def distance_sq(u: numpy.ndarray) -> float: x0, _, _, g0 = path0.at(u[0] * size0) x1, _, _, g1 = path1.at(u[1] * size1) n0 = numpy.array([-g0[1], g0[0]]) * (g0[0] ** 2 + g0[1] ** 2) ** -0.5 n1 = numpy.array([-g1[1], g1[0]]) * (g1[0] ** 2 + g1[1] ** 2) ** -0.5 x0 += n0 * offset0 x1 += n1 * offset1 v = x1 - x0 return v[0] ** 2 + v[1] ** 2 minimize_kwargs = {"method": "Nelder-Mead", "options": {"adaptive": False, "maxiter": 500}} bounds = [(0.5, 1.0) if inverted0 else (0.0, 0.5), (0.0, 0.5) if inverted1 else (0.5, 1.0)] x0 = [bounds[0][0] + 0.25, bounds[1][0] + 0.25] res = minimize(distance_sq, x0=x0, bounds=bounds, **minimize_kwargs) if not res.success and res.fun > (2 * config.tolerance) ** 2: raise RuntimeError( f"Unable to find start of coupling region from path intersections in component " f"{component.name!r}." ) u = res.x[0] * size0 center0, _, _, grad = path0.at(u) angle0 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0 length0 = _subpath_length(path0, u, size0 if inverted0 else 0.0, npts0) u = res.x[1] * size1 center1, _, _, grad = path1.at(u) angle1 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0 length1 = _subpath_length(path1, u, 0.0 if inverted1 else size1, npts1) bounds = [(0.0, 0.5) if inverted0 else (0.5, 1.0), (0.5, 1.0) if inverted1 else (0.0, 0.5)] # x0 = [1.0 - res.x[0], 1.0 - res.x[1]] x0 = [bounds[0][0] + 0.25, bounds[1][0] + 0.25] res = minimize(distance_sq, x0=x0, bounds=bounds, **minimize_kwargs) if not res.success and res.fun > (2 * config.tolerance) ** 2: raise RuntimeError( f"Unable to find start of coupling region from path intersections in component " f"{component.name!r}." ) u = res.x[0] * size0 center2, _, _, grad = path0.at(u) angle2 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0 length2 = _subpath_length(path0, u, 0.0 if inverted0 else size0, npts0) u = res.x[1] * size1 center3, _, _, grad = path1.at(u) angle3 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0 length3 = _subpath_length(path1, u, size1 if inverted1 else 0.0, npts1) if inverted0: angle0 += 180 else: angle2 += 180 if inverted1: angle3 += 180 else: angle1 += 180 p = self.parametric_kwargs arms_model = p["arms_model"] if isinstance(arms_model, Model): arms_model = {n0: arms_model, n1: arms_model, n2: arms_model, n3: arms_model} coupler = Component(f"Coupling region - {component.name}", technology=component.technology) offset = max(abs(offset0), abs(offset1)) arms = [] for i, (name, port, center, angle, length) in enumerate( [ (n0, p0, center0, angle0, length0), (n1, p1, center1, angle1, length1), (n2, p2, center2, angle2, length2), (n3, p3, center3, angle3, length3), ] ): coupler.add_port(Port(center, angle, port.spec, extended=False), name) c = Component(f"Arm {i} - {component.name}", technology=component.technology) c.add_port(port, name) c.add_port(Port(center, angle - 180, port.spec.inverted(), extended=False), f"X{i}") # Get bounds from ports only; we don't want the original geometry bounds a, b = c.bounds() bounds = [a - offset, b + offset] arm_model = arms_model.get(name, _arms_model) if ( isinstance(arm_model, WaveguideModel) and arm_model.parametric_kwargs["length"] is None ): arm_model = arm_model.__copy__().update(length=length) elif isinstance(arm_model, Tidy3DModel): b = arm_model.parametric_kwargs["bounds"] if all(b[i][j] is None for i in (0, 1) for j in (0, 1)): arm_model = arm_model.__copy__().update( bounds=[(*bounds[0], b[0][2]), (*bounds[1], b[1][2])] ) c.add_model(arm_model) c.add(component) arms.append(c) # Get bounds from ports only; we don't want the original geometry bounds a, b = coupler.bounds() bounds = [a - offset, b + offset] # If the new ports are too close to the original ones, we must make sure to extend them for # a correct simulation port_extension = -component.size().max() coupler.add(component) for port in (p0, p1, p2, p3): endpoint = port_extension * numpy.exp(1j * port.input_direction / 180.0 * numpy.pi) for layer, path in port.spec.get_paths(port.center): coupler.add(layer, path.segment(endpoint, relative=True)) coupler_model = p["coupling_region_model"] if isinstance(coupler_model, Tidy3DModel): b = coupler_model.parametric_kwargs["bounds"] if all(b[i][j] is None for i in (0, 1) for j in (0, 1)): coupler_model = coupler_model.__copy__().update( bounds=[(*bounds[0], b[0][2]), (*bounds[1], b[1][2])] ) coupler.add_model(coupler_model) circuit = Component(f"Circuit - {component.name}", technology=component.technology) circuit.add(*arms, coupler) circuit.add_virtual_connection_by_instance(0, "X0", 4, n0) circuit.add_virtual_connection_by_instance(1, "X1", 4, n1) circuit.add_virtual_connection_by_instance(2, "X2", 4, n2) circuit.add_virtual_connection_by_instance(3, "X3", 4, n3) circuit.add_port(p0, n0) circuit.add_port(p1, n1) circuit.add_port(p2, n2) circuit.add_port(p3, n3) circuit.add_model(p["circuit_model"]) return circuit
[docs] @cache_s_matrix def start(self, component: Component, frequencies: Sequence[float], **kwargs: object) -> object: """Start computing the S matrix response from a component. Args: component: Component from which to compute the S matrix. frequencies: Sequence of frequencies at which to perform the computation. **kwargs: Keyword arguments passed to sub-models. Returns: Result object with attributes ``status`` and ``s_matrix``. """ circuit = self.get_circuit(component, frequency_classification(frequencies)) return circuit.active_model.start(circuit, frequencies, **kwargs)
register_model_class(CircuitModel) register_model_class(DirectionalCouplerCircuitModel)