import struct
import threading
import time
from collections.abc import Sequence
from typing import Any, Literal
import numpy
from scipy.optimize import minimize
from ..cache import _mode_overlap_cache, cache_s_matrix
from ..circuit_base import _process_component_netlist
from ..extension import (
Component,
Model,
Path,
Port,
SMatrix,
_connect_s_matrices,
config,
frequency_classification,
register_model_class,
)
from ..typing import PositiveFloat
from ..utils import _align_and_overlap, _gather_status
from .tidy3d import Tidy3DModel
from .waveguide import WaveguideModel
class _CircuitModelRunner:
def __init__(
self,
runners: dict[object, object],
frequencies: Sequence[float],
component_name: str,
ports: dict[str, Port],
port_connections: dict[str, tuple[int, str, int]],
connections: Sequence[tuple[tuple[int, str, int], tuple[int, str, int]]],
instance_port_data: Sequence[tuple[object, object]],
cost_estimation: bool,
) -> None:
self.runners = runners
self.frequencies = frequencies
self.component_name = component_name
self.ports = ports
self.port_connections = port_connections
self.connections = connections
self.instance_port_data = instance_port_data
self.cost_estimation = cost_estimation
self.lock = threading.Lock()
self._s_matrix = None
self._status = {"progress": 0, "message": "running", "tasks": {}}
self.thread = threading.Thread(daemon=True, target=self._run_and_monitor_task)
self.thread.start()
def _run_and_monitor_task(self) -> None:
task_status = _gather_status(*self.runners.values())
w_tasks = 3 * len(task_status["tasks"])
n_ports = len(self.instance_port_data)
n_connections = len(self.connections)
denominator = max(1, w_tasks + n_ports + n_connections)
with self.lock:
self._status = dict(task_status)
self._status["progress"] *= w_tasks / denominator
while task_status["message"] == "running":
time.sleep(0.3)
task_status = _gather_status(*self.runners.values())
with self.lock:
self._status = dict(task_status)
self._status["progress"] *= w_tasks / denominator
if task_status["message"] == "error":
with self.lock:
self._status = task_status
return
with self.lock:
self._status = task_status
if self.cost_estimation:
return
self._status["message"] = "running"
self._status["progress"] *= w_tasks / denominator
s_dict = {}
for index, (instance_ports, instance_keys) in enumerate(self.instance_port_data):
# Check if reference is needed
if instance_ports is None:
continue
runner = self.runners[index]
s_matrix = runner if isinstance(runner, SMatrix) else runner.s_matrix
if s_matrix is None:
with self.lock:
self._status["message"] = "error"
return
# Fix port phases if a rotation is applied
mode_factor = {}
if instance_keys is not None:
for port_name, port in instance_ports:
key = instance_keys.get(port_name)
if key is None:
continue
# Port mode
overlap = _mode_overlap_cache[key]
if overlap is None:
overlap = _align_and_overlap(
self.runners[(index, port_name, 0)].data,
self.runners[(index, port_name, 1)].data,
)[0]
_mode_overlap_cache[key] = overlap
for mode in range(port.num_modes):
mode_factor[f"{port_name}@{mode}"] = overlap[mode]
for (i, j), s_ji in s_matrix.elements.items():
s_dict[(index, i), (index, j)] = (
s_ji * mode_factor.get(i, 1.0) / mode_factor.get(j, 1.0)
)
with self.lock:
self._status["progress"] = 100 * (w_tasks + index + 1) / denominator
s_dict = _connect_s_matrices(s_dict, self.connections, len(self.instance_port_data))
# Build S matrix with desired ports
ports = {
(index, f"{ref_name}@{n}"): f"{port_name}@{n}"
for (index, ref_name, modes), port_name in self.port_connections.items()
for n in range(modes)
}
elements = {
(ports[i], ports[j]): s_ji
for (i, j), s_ji in s_dict.items()
if i in ports and j in ports
}
with self.lock:
self._s_matrix = SMatrix(self.frequencies, elements, self.ports)
self._status["progress"] = 100
self._status["message"] = "success"
@property
def status(self) -> dict[str, object]:
with self.lock:
return self._status
@property
def s_matrix(self) -> SMatrix:
with self.lock:
return self._s_matrix
[docs]
class CircuitModel(Model):
"""Model based on circuit-level S-parameter calculation.
The component is expected to be composed of interconnected references.
Scattering parameters are computed based on the S matrices from all
references and their interconnections.
The S matrix of each reference is calculated based on the active model
of the reference's component. Each calculation is preceded by an update
to the componoent's technology, the component itself, and its active
model by calling :attr:`Reference.update`. They are reset to their
original state after the :func:`CircuitModel.start` function is called.
Keyword arguents in :attr:`Reference.s_matrix_kwargs` will be passed on
to :func:`CircuitModel.start`.
If a reference includes repetitions, it is flattened so that each
instance is called separately.
Connections between incompatible ports (butt couplings) are handled
automatically. For electrical ports, if either one has impedance
information, the coupling matrix is approximated by reflection and
transmission coefficients derived from both port impedances. For optical
ports and electrical ports without impedance information, an
:class:`EMEModel` is used to calculate the butt coupling coefficients.
Args:
mesh_refinement: Minimal number of mesh elements per wavelength used
for mode solving.
verbose: Flag setting the verbosity of mode solver runs.
See also:
- :func:`Component.get_netlist`
- :attr:`PortSpec.impedance`
- `Circuit Model guide <../guides/Circuit_Model.ipynb>`__
"""
def __init__(
self,
mesh_refinement: PositiveFloat | None = None,
verbose: bool = True,
) -> None:
super().__init__(mesh_refinement=mesh_refinement, verbose=verbose)
@property
def mesh_refinement(self) -> PositiveFloat | None:
return self.parametric_kwargs["mesh_refinement"]
@mesh_refinement.setter
def mesh_refinement(self, value: PositiveFloat | None) -> None:
self.parametric_kwargs["mesh_refinement"] = value
@property
def verbose(self) -> bool:
return self.parametric_kwargs["verbose"]
@verbose.setter
def verbose(self, value: bool) -> None:
self.parametric_kwargs["verbose"] = value
[docs]
@cache_s_matrix
def start(
self,
component: Component,
frequencies: Sequence[float],
updates: dict[Sequence[str | int | None], dict[str, dict[str, Any]]] | None = None,
chain_technology_updates: bool = True,
verbose: bool | None = None,
cost_estimation: bool = False,
**kwargs: object,
) -> _CircuitModelRunner:
"""Start computing the S matrix response from a component.
Args:
component: Component from which to compute the S matrix.
frequencies: Sequence of frequencies at which to perform the
computation.
updates: Dictionary of parameter updates to be applied to
components, technologies, and models for references within the
main component. See below for further information.
chain_technology_updates: if set, a technology update will trigger
an update for all components using that technology.
verbose: If set, overrides the model's ``verbose`` attribute and
is passed to reference models.
cost_estimation: If set, Tidy3D simulations are uploaded, but not
executed. S matrix will *not* be computed.
**kwargs: Keyword arguments passed to reference models.
Returns:
Result object with attributes ``status`` and ``s_matrix``.
The ``'updates'`` dictionary contains keyword arguments for the
:func:`Reference.update` function for the references in the component
dependency tree, such that, when the S parameter of a specific reference
are computed, that reference can be updated without affecting others
using the same component.
Each key in the dictionary is used as a reference specification. It must
be a tuple with any number of the following:
- ``name: str | re.Pattern``: selects any reference whose component name
matches the given regex.
- ``i: int``, directly following ``name``: limits the selection to
``reference[i]`` from the list of references matching the name. A
negative value will match all list items. Note that each repetiton in
a reference array counts as a single element in the list.
- ``None``: matches any reference at any depth.
Examples:
>>> updates = {
... # Apply component updates to the first "ARM" reference in
... # the main component
... ("ARM", 0): {"component_updates": {"radius": 10}}
... # Apply model updates to the second "BEND" reference under
... # any "SUB" references in the main component
... ("SUB", "BEND", 1): {"model_updates": {"verbose": False}}
... # Apply technology updates to references with component name
... # starting with "COMP_" prefix, at any subcomponent depth
... (None, "COMP.*"): {"technology_updates": {"thickness": 0.3}}
... }
>>> s_matrix = component.s_matrix(
... frequencies, model_kwargs={"updates": updates}
... )
See also:
- `Circuit Model guide <../guides/Circuit_Model.ipynb>`__
- `Cascaded Rings Filter example
<../examples/Cascaded_Rings_Filter.ipynb>`__
"""
if updates is None:
updates = {}
if verbose is None:
verbose = self.verbose
s_matrix_kwargs = {}
else:
s_matrix_kwargs = {"verbose": verbose}
if cost_estimation:
s_matrix_kwargs["cost_estimation"] = cost_estimation
frequencies = numpy.array(frequencies, dtype=float, ndmin=1)
runners, _, component_ports, port_connections, connections, instance_port_data, _ = (
_process_component_netlist(
component,
frequencies,
self.mesh_refinement,
{},
updates,
chain_technology_updates,
verbose,
cost_estimation,
kwargs,
s_matrix_kwargs,
None,
)
)
return _CircuitModelRunner(
runners,
frequencies,
component.name,
component_ports,
port_connections,
connections,
instance_port_data,
cost_estimation,
)
# Deprecated: kept for backwards compatibility with old phf files
[docs]
@classmethod
def from_bytes(cls, byte_repr: bytes) -> "CircuitModel":
"""De-serialize this model."""
(version, verbose, mesh_refinement) = struct.unpack("<B?d", byte_repr)
if version != 0:
raise RuntimeError("Unsuported CircuitModel version.")
if mesh_refinement <= 0:
mesh_refinement = None
return cls(mesh_refinement, verbose)
def _subpath_length(path: Path, u0: float, u1: float, npts: int) -> float:
vecs = numpy.diff([path.at(u, output="position") for u in numpy.linspace(u0, u1, npts)], axis=0)
return ((vecs[:, 0] ** 2 + vecs[:, 1] ** 2) ** 0.5).sum()
_coupler_model = Tidy3DModel()
_arms_model = Tidy3DModel()
_circuit_model = CircuitModel()
[docs]
class DirectionalCouplerCircuitModel(Model):
"""Model for directional couplers based on circuit decomposition.
The coupler is subdivided into 5 parts: the 4 arms and the coupling
region. Each is simulated separately, and the final S parameters are
computed by a :class:`CircuitModel`.
The component geometry is expected to be composed of at least 2 paths
connecting 2 ports each. The paths must start far apart, come close
together in the coupling region, and separate again. The distance
between paths is used to determine the extents of the arms and coupling
region.
Args:
coupling_region_model: Model used to compute the S parameters in the
coupling region.
arms_model: Model used for the arms. A dictionary mapping port names
to models can be used to set a specific models for each arm.
circuit_model: Model used to merge all component parts.
"""
def __init__(
self,
*,
coupling_region_model: Model = _coupler_model,
arms_model: Model | dict[str, Model] = _arms_model,
circuit_model: Model = _circuit_model,
) -> None:
super().__init__(
coupling_region_model=coupling_region_model,
arms_model=arms_model,
circuit_model=circuit_model,
)
[docs]
def get_circuit(
self, component: Component, classification: Literal["optical", "electrical"] = "optical"
) -> Component:
"""Build the circuit equivalent for a component.
Args:
component: Component to subdivide into arms and coupling region.
classification: Frequency classification to use.
Returns:
Subdivided component.
"""
ports = sorted(component.select_ports(classification).items(), key=lambda x: x[1].center[0])
if len(ports) != 4:
raise RuntimeError(
f"Component {component.name!r} must have 4 {classification} ports to use "
f"'DirectionalCouplerCircuitModel'."
)
if not all(isinstance(port, Port) for _, port in ports):
raise RuntimeError(f"All ports in {component.name!r} must be 'Port' instances.")
structures = component.structures
# Expect the following port configuration:
# P1 _ _ P3
# \_/
# P0 ----- P2
(n0, p0), (n1, p1), (n2, p2), (n3, p3) = ports
if p0.center[1] > p1.center[1]:
p0, p1 = p1, p0
n0, n1 = n1, n0
path_profiles = p0.spec.path_profiles_list()
if len(path_profiles) == 0:
raise RuntimeError(
"Path profiles must not be empty to use an 'DirectionalCouplerCircuitModel'."
)
layer0 = path_profiles[0][2]
# Find path0 that goes from p0 to p2
path0 = None
for s in structures[layer0]:
if not isinstance(s, Path):
continue
start = s.at(0, output="position")
end = s.at(s.size, output="position")
if numpy.allclose(p0.center, start):
if numpy.allclose(p2.center, end):
path0 = s
inverted0 = False
break
elif numpy.allclose(p1.center, end):
p1, p2 = p2, p1
n1, n2 = n2, n1
path0 = s
inverted0 = False
break
elif numpy.allclose(p3.center, end):
p3, p2 = p2, p3
n3, n2 = n2, n3
path0 = s
inverted0 = False
break
elif numpy.allclose(p0.center, end):
if numpy.allclose(p2.center, start):
path0 = s
inverted0 = True
break
elif numpy.allclose(p1.center, start):
p1, p2 = p2, p1
n1, n2 = n2, n1
path0 = s
inverted0 = True
break
elif numpy.allclose(p3.center, start):
p3, p2 = p2, p3
n3, n2 = n2, n3
path0 = s
inverted0 = True
break
if path0 is None:
raise RuntimeError(
f"Unable to find a path in layer {layer0} conneting port {n0!r} to any other port "
f"in component {component.name!r}."
)
v_axis = p2.center - p0.center
axis = 0 if abs(v_axis[0]) >= abs(v_axis[1]) else 1
if p1.center[axis] > p3.center[axis]:
p3, p1 = p1, p3
n3, n1 = n1, n3
# Find path1 from p3 to p1
path_profiles = p3.spec.path_profiles_list()
if len(path_profiles) == 0:
raise RuntimeError(
"Path profiles must not be empty to use an 'DirectionalCouplerCircuitModel'."
)
layer1 = path_profiles[0][2]
path1 = None
for s in structures[layer1]:
if not isinstance(s, Path):
continue
start = s.at(0, output="position")
end = s.at(s.size, output="position")
if numpy.allclose(p3.center, start) and numpy.allclose(p1.center, end):
path1 = s
inverted1 = False
break
elif numpy.allclose(p1.center, start) and numpy.allclose(p3.center, end):
path1 = s
inverted1 = True
break
if path1 is None:
raise RuntimeError(
f"Unable to find a path in layer {layer1} conneting ports {n1!r} and {n3!r} in "
f"component {component.name!r}."
)
offset0 = p0.spec.width * 2**-0.5
offset1 = p3.spec.width * 2**-0.5
if inverted0:
offset0 = -offset0
if inverted1:
offset1 = -offset1
v_axis = 0.5 * (p2.center - p0.center + p3.center - p1.center)
v_norm = 0.5 * (p1.center - p0.center + p3.center - p2.center)
if v_axis[0] * v_norm[1] - v_axis[1] * v_norm[0] < 0:
# if the normal is to the right of the propagation axis, invert the offsets
offset0 = -offset0
offset1 = -offset1
size0 = path0.size
npts0 = path0.spine().shape[0]
size1 = path1.size
npts1 = path1.spine().shape[0]
# Find crossings between path0 and path1 with respective offsets
def distance_sq(u: numpy.ndarray) -> float:
x0, _, _, g0 = path0.at(u[0] * size0)
x1, _, _, g1 = path1.at(u[1] * size1)
n0 = numpy.array([-g0[1], g0[0]]) * (g0[0] ** 2 + g0[1] ** 2) ** -0.5
n1 = numpy.array([-g1[1], g1[0]]) * (g1[0] ** 2 + g1[1] ** 2) ** -0.5
x0 += n0 * offset0
x1 += n1 * offset1
v = x1 - x0
return v[0] ** 2 + v[1] ** 2
minimize_kwargs = {"method": "Nelder-Mead", "options": {"adaptive": False, "maxiter": 500}}
bounds = [(0.5, 1.0) if inverted0 else (0.0, 0.5), (0.0, 0.5) if inverted1 else (0.5, 1.0)]
x0 = [bounds[0][0] + 0.25, bounds[1][0] + 0.25]
res = minimize(distance_sq, x0=x0, bounds=bounds, **minimize_kwargs)
if not res.success and res.fun > (2 * config.tolerance) ** 2:
raise RuntimeError(
f"Unable to find start of coupling region from path intersections in component "
f"{component.name!r}."
)
u = res.x[0] * size0
center0, _, _, grad = path0.at(u)
angle0 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0
length0 = _subpath_length(path0, u, size0 if inverted0 else 0.0, npts0)
u = res.x[1] * size1
center1, _, _, grad = path1.at(u)
angle1 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0
length1 = _subpath_length(path1, u, 0.0 if inverted1 else size1, npts1)
bounds = [(0.0, 0.5) if inverted0 else (0.5, 1.0), (0.5, 1.0) if inverted1 else (0.0, 0.5)]
# x0 = [1.0 - res.x[0], 1.0 - res.x[1]]
x0 = [bounds[0][0] + 0.25, bounds[1][0] + 0.25]
res = minimize(distance_sq, x0=x0, bounds=bounds, **minimize_kwargs)
if not res.success and res.fun > (2 * config.tolerance) ** 2:
raise RuntimeError(
f"Unable to find start of coupling region from path intersections in component "
f"{component.name!r}."
)
u = res.x[0] * size0
center2, _, _, grad = path0.at(u)
angle2 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0
length2 = _subpath_length(path0, u, 0.0 if inverted0 else size0, npts0)
u = res.x[1] * size1
center3, _, _, grad = path1.at(u)
angle3 = numpy.arctan2(grad[1], grad[0]) / numpy.pi * 180.0
length3 = _subpath_length(path1, u, size1 if inverted1 else 0.0, npts1)
if inverted0:
angle0 += 180
else:
angle2 += 180
if inverted1:
angle3 += 180
else:
angle1 += 180
p = self.parametric_kwargs
arms_model = p["arms_model"]
if isinstance(arms_model, Model):
arms_model = {n0: arms_model, n1: arms_model, n2: arms_model, n3: arms_model}
coupler = Component(f"Coupling region - {component.name}", technology=component.technology)
offset = max(abs(offset0), abs(offset1))
arms = []
for i, (name, port, center, angle, length) in enumerate(
[
(n0, p0, center0, angle0, length0),
(n1, p1, center1, angle1, length1),
(n2, p2, center2, angle2, length2),
(n3, p3, center3, angle3, length3),
]
):
coupler.add_port(Port(center, angle, port.spec, extended=False), name)
c = Component(f"Arm {i} - {component.name}", technology=component.technology)
c.add_port(port, name)
c.add_port(Port(center, angle - 180, port.spec.inverted(), extended=False), f"X{i}")
# Get bounds from ports only; we don't want the original geometry bounds
a, b = c.bounds()
bounds = [a - offset, b + offset]
arm_model = arms_model.get(name, _arms_model)
if (
isinstance(arm_model, WaveguideModel)
and arm_model.parametric_kwargs["length"] is None
):
arm_model = arm_model.__copy__().update(length=length)
elif isinstance(arm_model, Tidy3DModel):
b = arm_model.parametric_kwargs["bounds"]
if all(b[i][j] is None for i in (0, 1) for j in (0, 1)):
arm_model = arm_model.__copy__().update(
bounds=[(*bounds[0], b[0][2]), (*bounds[1], b[1][2])]
)
c.add_model(arm_model)
c.add(component)
arms.append(c)
# Get bounds from ports only; we don't want the original geometry bounds
a, b = coupler.bounds()
bounds = [a - offset, b + offset]
# If the new ports are too close to the original ones, we must make sure to extend them for
# a correct simulation
port_extension = -component.size().max()
coupler.add(component)
for port in (p0, p1, p2, p3):
endpoint = port_extension * numpy.exp(1j * port.input_direction / 180.0 * numpy.pi)
for layer, path in port.spec.get_paths(port.center):
coupler.add(layer, path.segment(endpoint, relative=True))
coupler_model = p["coupling_region_model"]
if isinstance(coupler_model, Tidy3DModel):
b = coupler_model.parametric_kwargs["bounds"]
if all(b[i][j] is None for i in (0, 1) for j in (0, 1)):
coupler_model = coupler_model.__copy__().update(
bounds=[(*bounds[0], b[0][2]), (*bounds[1], b[1][2])]
)
coupler.add_model(coupler_model)
circuit = Component(f"Circuit - {component.name}", technology=component.technology)
circuit.add(*arms, coupler)
circuit.add_virtual_connection_by_instance(0, "X0", 4, n0)
circuit.add_virtual_connection_by_instance(1, "X1", 4, n1)
circuit.add_virtual_connection_by_instance(2, "X2", 4, n2)
circuit.add_virtual_connection_by_instance(3, "X3", 4, n3)
circuit.add_port(p0, n0)
circuit.add_port(p1, n1)
circuit.add_port(p2, n2)
circuit.add_port(p3, n3)
circuit.add_model(p["circuit_model"])
return circuit
[docs]
@cache_s_matrix
def start(self, component: Component, frequencies: Sequence[float], **kwargs: object) -> object:
"""Start computing the S matrix response from a component.
Args:
component: Component from which to compute the S matrix.
frequencies: Sequence of frequencies at which to perform the
computation.
**kwargs: Keyword arguments passed to sub-models.
Returns:
Result object with attributes ``status`` and ``s_matrix``.
"""
circuit = self.get_circuit(component, frequency_classification(frequencies))
return circuit.active_model.start(circuit, frequencies, **kwargs)
register_model_class(CircuitModel)
register_model_class(DirectionalCouplerCircuitModel)