Source code for

"""Monitor Level Data, store the DataArrays associated with a single monitor."""

from __future__ import annotations

import warnings
from abc import ABC
from typing import Any, Callable, Dict, List, Tuple, Union

import autograd.numpy as np
import pydantic.v1 as pd
import xarray as xr
from pandas import DataFrame

from ...constants import C_0, ETA_0, MICROMETER, MU_0
from ...exceptions import DataError, SetupError, Tidy3dNotImplementedError, ValidationError
from ...log import log
from ..base import TYPE_TAG_STR, cached_property, skip_if_fields_missing
from import AbstractMonitorData
from ..geometry.base import Box
from ..grid.grid import Coords, Grid
from ..medium import Medium, MediumType
from ..monitor import (
from ..source import (
from ..types import (
from ..validators import enforce_monitor_fields_present, required_if_symmetry_present
from .data_array import (
from .dataset import (

Coords1D = ArrayFloat1D

# how much to shift the adjoint field source for 0-D axes dimensions

[docs] class MonitorData(AbstractMonitorData, ABC): """ Abstract base class of objects that store data pertaining to a single :class:`.monitor`. """ monitor: MonitorType = pd.Field( ..., title="Monitor", description="Monitor associated with the data.", discriminator=TYPE_TAG_STR, ) @property def symmetry_expanded(self) -> MonitorData: """Return self with symmetry applied.""" return self
[docs] def normalize(self, source_spectrum_fn: Callable[[float], complex]) -> Dataset: """Return copy of self after normalization is applied using source spectrum function.""" return self.copy()
def _updated(self, update: Dict) -> MonitorData: """Similar to ``updated_copy``, but does not actually copy components, for speed. Note ---- This does **not** produce a copy of mutable objects, so e.g. if some of the data arrays are not updated, they will point to the values in the original data. This method should thus be used carefully. """ data_dict = self.dict() data_dict.update(update) return type(self).parse_obj(data_dict)
[docs] def make_adjoint_sources(self, dataset_names: list[str], fwidth: float) -> list[Source]: """Generate adjoint sources for this ``MonitorData`` instance.""" # TODO: if there's data in the MonitorData, but no adjoint source, then # user is trying to differentiate something that is un-supported by us # warn? return []
[docs] @staticmethod def flip_direction(direction: Union[str, DataArray]) -> str: """Flip the direction of a string ``('+', '-') -> ('-', '+')``.""" if isinstance(direction, DataArray): direction = str(direction.values) if direction not in ("+", "-"): raise ValueError(f"Direction must be in {('+', '-')}, got '{direction}'.") return "-" if direction == "+" else "+"
[docs] @staticmethod def get_amplitude(x) -> complex: """Get the complex amplitude out of some data.""" if isinstance(x, DataArray): x = complex(x.values) return 1j * complex(x)
class AbstractFieldData(MonitorData, AbstractFieldDataset, ABC): """Collection of scalar fields with some symmetry properties.""" monitor: Union[FieldMonitor, FieldTimeMonitor, PermittivityMonitor, ModeMonitor] symmetry: Tuple[Symmetry, Symmetry, Symmetry] = pd.Field( (0, 0, 0), title="Symmetry", description="Symmetry eigenvalues of the original simulation in x, y, and z.", ) symmetry_center: Coordinate = pd.Field( None, title="Symmetry Center", description="Center of the symmetry planes of the original simulation in x, y, and z. " "Required only if any of the ``symmetry`` field are non-zero.", ) grid_expanded: Grid = pd.Field( None, title="Expanded Grid", description=":class:`.Grid` discretization of the associated monitor in the simulation " "which created the data. Required if symmetries are present, as " "well as in order to use some functionalities like getting Poynting vector and flux.", ) @pd.validator("grid_expanded", always=True) def warn_missing_grid_expanded(cls, val, values): """If ``grid_expanded`` not provided and fields data is present, warn that some methods will break.""" field_comps = ["Ex", "Ey", "Ez", "Hx", "Hy", "Hz"] if val is None and any(values.get(comp) is not None for comp in field_comps): log.warning( "Monitor data requires 'grid_expanded' to be defined to compute values like " "flux, Poynting and dot product with other data." ) return val _require_sym_center = required_if_symmetry_present("symmetry_center") _require_grid_expanded = required_if_symmetry_present("grid_expanded") def _expanded_grid_field_coords(self, field_name: str) -> Coords: """Coordinates in the expanded grid corresponding to a given field component.""" return self.grid_expanded[self.grid_locations[field_name]] @property def symmetry_expanded(self): """Return the :class:`.AbstractFieldData` with fields expanded based on symmetry. If any symmetry is nonzero (i.e. expanded), the interpolation implicitly creates a copy of the data array. However, if symmetry is not expanded, the returned array contains a view of the data, not a copy. Returns ------- :class:`AbstractFieldData` A data object with the symmetry expanded fields. """ if all(sym == 0 for sym in self.symmetry): return self return self._updated(self._symmetry_update_dict) @property def symmetry_expanded_copy(self) -> AbstractFieldData: """Create a copy of the :class:`.AbstractFieldData` with fields expanded based on symmetry. Returns ------- :class:`AbstractFieldData` A data object with the symmetry expanded fields. """ if all(sym == 0 for sym in self.symmetry): return self.copy() return self.copy(update=self._symmetry_update_dict) @property def _symmetry_update_dict(self) -> Dict: """Dictionary of data fields to create data with expanded symmetry.""" update_dict = {} for field_name, scalar_data in self.field_components.items(): eigenval_fn = self.symmetry_eigenvalues[field_name] # get grid locations for this field component on the expanded grid field_coords = self._expanded_grid_field_coords(field_name) for sym_dim, (sym_val, sym_loc) in enumerate(zip(self.symmetry, self.symmetry_center)): dim_name = "xyz"[sym_dim] # Continue if no symmetry along this dimension if sym_val == 0: continue # Get coordinates for this field component on the expanded grid coords = field_coords.to_list[sym_dim] coords = self.monitor.downsample(coords, axis=sym_dim) # Get indexes of coords that lie on the left of the symmetry center flip_inds = np.where(coords < sym_loc)[0] # Get the symmetric coordinates on the right coords_interp = np.copy(coords) coords_interp[flip_inds] = 2 * sym_loc - coords[flip_inds] # Interpolate. There generally shouldn't be values out of bounds except potentially # when handling modes, in which case they should be at the boundary and close to 0. scalar_data = scalar_data.sel(**{dim_name: coords_interp}, method="nearest") scalar_data = scalar_data.assign_coords({dim_name: coords}) # apply the symmetry eigenvalue (if defined) to the flipped values if eigenval_fn is not None: sym_eigenvalue = eigenval_fn(sym_dim) scalar_data = scalar_data.multiply_at( value=sym_val * sym_eigenvalue, coord_name=dim_name, indices=flip_inds ) # assign the final scalar data to the update_dict update_dict[field_name] = scalar_data update_dict.update({"symmetry": (0, 0, 0), "symmetry_center": None}) return update_dict def at_coords(self, coords: Coords) -> xr.Dataset: """Colocate data to some supplied coordinates. This is a convenience method that wraps ``colocate``, and skips dimensions for which the data has a single data point only (``colocate`` will error in that case.) If the coords are out of bounds for the data otherwise, an error will still be produced. Parameters ---------- coords : :class:`Coords` Coordinates in x, y and z to colocate to. Returns ------- xarray.Dataset Dataset containing all of the fields in the data interpolated to boundary locations on the Yee grid. """ # pass coords if each of the scalar field data have more than one coordinate along a dim xyz_kwargs = {} for dim, coords_dim in zip("xyz", (coords.x, coords.y, coords.z)): scalar_data = list(self.field_components.values()) coord_lens = [len(data.coords[dim]) for data in scalar_data] if all(ncoords > 1 for ncoords in coord_lens): xyz_kwargs[dim] = coords_dim return self.colocate(**xyz_kwargs)
[docs] class ElectromagneticFieldData(AbstractFieldData, ElectromagneticFieldDataset, ABC): """Collection of electromagnetic fields.""" grid_primal_correction: Union[ float, FreqDataArray, TimeDataArray, FreqModeDataArray, EMEFreqModeDataArray, ] = pd.Field( 1.0, title="Field correction factor", description="Correction factor that needs to be applied for data corresponding to a 2D " "monitor to take into account the finite grid in the normal direction in the simulation in " "which the data was computed. The factor is applied to fields defined on the primal grid " "locations along the normal direction.", ) grid_dual_correction: Union[ float, FreqDataArray, TimeDataArray, FreqModeDataArray, EMEFreqModeDataArray, ] = pd.Field( 1.0, title="Field correction factor", description="Correction factor that needs to be applied for data corresponding to a 2D " "monitor to take into account the finite grid in the normal direction in the simulation in " "which the data was computed. The factor is applied to fields defined on the dual grid " "locations along the normal direction.", ) def _expanded_grid_field_coords(self, field_name: str): """Coordinates in the expanded grid corresponding to a given field component.""" if self.monitor.colocate: bounds_dict = self.grid_expanded.boundaries.to_dict return Coords(**{key: val[:-1] for key, val in bounds_dict.items()}) return self.grid_expanded[self.grid_locations[field_name]] @property def _grid_correction_dict(self): """Return the primal and dual finite grid correction factors as a dictionary.""" return { "grid_primal_correction": self.grid_primal_correction, "grid_dual_correction": self.grid_dual_correction, } @property def _tangential_dims(self) -> List[str]: """For a 2D monitor data, return the names of the tangential dimensions. Raise if cannot confirm that the associated monitor is 2D.""" if len(self.monitor.zero_dims) != 1: raise DataError("Data must be 2D to get tangential dimensions.") tangential_dims = ["x", "y", "z"] tangential_dims.pop(self.monitor.zero_dims[0]) return tangential_dims @property def colocation_boundaries(self) -> Coords: """Coordinates to be used for colocation of the data to grid boundaries.""" if not self.grid_expanded: raise DataError( "Monitor data requires 'grid_expanded' to be defined in order to " "compute colocation coordinates." ) # Get boundaries from the expanded grid grid_bounds = self.grid_expanded.boundaries.to_dict # Non-colocating monitors can only colocate starting from the first boundary # (unless there's a single data point, in which case data has already been snapped). # Regardless of colocation, we also drop the last boundary. colocate_bounds = {} for dim, bounds in grid_bounds.items(): cbs = bounds[:-1] if not self.monitor.colocate and cbs.size > 1: cbs = cbs[1:] colocate_bounds[dim] = cbs return Coords(**colocate_bounds) @property def colocation_centers(self) -> Coords: """Coordinates to be used for colocation of the data to grid centers.""" colocate_centers = {} for dim, coords in self.colocation_boundaries.to_dict.items(): colocate_centers[dim] = (coords[1:] + coords[:-1]) / 2 return Coords(**colocate_centers) @property def _plane_grid_boundaries(self) -> Tuple[Coords1D, Coords1D]: """For a 2D monitor data, return the boundaries of the in-plane grid to be used to compute differential area and to colocate fields if needed.""" if np.any(np.array(self.monitor.interval_space) > 1): raise Tidy3dNotImplementedError( "Cannot determine grid boundaries corresponding to " "down-sampled monitor data ('interval_space' > 1 along a direction)." ) dim1, dim2 = self._tangential_dims bounds_dict = self.colocation_boundaries.to_dict return (bounds_dict[dim1], bounds_dict[dim2]) @property def _plane_grid_centers(self) -> Tuple[Coords1D, Coords1D]: """For 2D monitor data, return the centers of the in-plane grid""" return [(bs[1:] + bs[:-1]) / 2 for bs in self._plane_grid_boundaries] @property def _diff_area(self) -> xr.DataArray: """For a 2D monitor data, return the area of each cell in the plane, for use in numerical integrations. This assumes that data is colocated to grid boundaries, and uses the difference in the surrounding grid centers to compute the area. """ # Monitor values are interpolated to bounds bounds = self._plane_grid_boundaries # Coords to compute cell sizes around the interpolation locations coords = [bs.copy() for bs in self._plane_grid_centers] # Append the first and last boundary _, plane_inds = self.monitor.pop_axis([0, 1, 2], self.monitor.size.index(0.0)) coords[0] = np.array([bounds[0][0]] + coords[0].tolist() + [bounds[0][-1]]) coords[1] = np.array([bounds[1][0]] + coords[1].tolist() + [bounds[1][-1]]) """Truncate coords to monitor boundaries. This implicitly makes extra pixels which may be present have size 0 and so won't be included in the integration. For pixels intersected by the monitor edge, the size is truncated to the part covered by the monitor. When using the differential area sizes defined in this way together with integrand values defined at cell boundaries, the integration is equivalent to trapezoidal rule with the first and last values interpolated to the exact monitor start/end location, if the integrand is zero outside of the monitor geometry. This should usually be the case for flux and dot computations""" mnt_bounds = np.array(self.monitor.bounds) mnt_bounds = mnt_bounds[:, plane_inds].T coords[0][np.argwhere(coords[0] < mnt_bounds[0, 0])] = mnt_bounds[0, 0] coords[0][np.argwhere(coords[0] > mnt_bounds[0, 1])] = mnt_bounds[0, 1] coords[1][np.argwhere(coords[1] < mnt_bounds[1, 0])] = mnt_bounds[1, 0] coords[1][np.argwhere(coords[1] > mnt_bounds[1, 1])] = mnt_bounds[1, 1] # Do not apply the spurious dl along a dimension where the simulation is 2D. # Instead, we just set the boundaries such that the cell size along the zero dimension is 1, # such that quantities like flux will come out in units of W / um. sizes_dim0 = coords[0][1:] - coords[0][:-1] if bounds[0].size > 1 else [1.0] sizes_dim1 = coords[1][1:] - coords[1][:-1] if bounds[1].size > 1 else [1.0] return xr.DataArray(np.outer(sizes_dim0, sizes_dim1), dims=self._tangential_dims) def _tangential_corrected(self, fields: Dict[str, DataArray]) -> Dict[str, DataArray]: """For a 2D monitor data, extract the tangential components from fields and orient them such that the third component would be the normal axis. This just means that the H field gets an extra minus sign if the normal axis is ``"y"``. Raise if any of the tangential field components is missing. The finite grid correction is also applied, so the intended use of these fields is in poynting, flux, and dot-like methods. The normal coordinate is dropped from the field data. """ if len(self.monitor.zero_dims) != 1: raise DataError("Data must be 2D to get tangential fields.") # Tangential field components tan_dims = self._tangential_dims components = [fname + dim for fname in "EH" for dim in tan_dims] normal_dim = "xyz"[self.monitor.size.index(0)] tan_fields = {} for component in components: if component not in fields: raise DataError(f"Tangential field component '{component}' missing in field data.") correction = 1 # sign correction to H if normal_dim == "y" and component[0] == "H": correction *= -1 # finite grid correction to all fields eig_val = self.symmetry_eigenvalues[component](normal_dim) if eig_val < 0: correction *= self.grid_dual_correction else: correction *= self.grid_primal_correction field_squeezed = fields[component].squeeze(dim=normal_dim, drop=True) tan_fields[component] = field_squeezed * correction return tan_fields @property def _tangential_fields(self) -> Dict[str, DataArray]: """For a 2D monitor data, get the tangential E and H fields in the 2D plane grid. Fields are oriented such that the third component would be the normal axis. This just means that the H field gets an extra minus sign if the normal axis is ``"y"``. Note ---- The finite grid correction factors are applied and symmetry is expanded. """ return self._tangential_corrected(self.symmetry_expanded.field_components) @property def _colocated_fields(self) -> Dict[str, DataArray]: """For a 2D monitor data, get all E and H fields colocated to the cell boundaries in the 2D plane grid, with symmetries expanded. """ field_components = self.symmetry_expanded.field_components if self.monitor.colocate: return field_components # Interpolate field components to cell boundaries interp_dict = {"assume_sorted": True} for dim, bounds in zip(self._tangential_dims, self._plane_grid_boundaries): if bounds.size > 1: interp_dict[dim] = bounds colocated_fields = {key: val.interp(**interp_dict) for key, val in field_components.items()} return colocated_fields @property def _colocated_tangential_fields(self) -> Dict[str, DataArray]: """For a 2D monitor data, get the tangential E and H fields colocated to the cell boundaries in the 2D plane grid. Fields are oriented such that the third component would be the normal axis. This just means that the H field gets an extra minus sign if the normal axis is ``"y"``. Raise if any of the tangential field components is missing. Note ---- The finite grid correction factors are applied and symmetry is expanded. """ return self._tangential_corrected(self._colocated_fields) @property def grid_corrected_copy(self) -> ElectromagneticFieldData: """Return a copy of self with grid correction factors applied (if necessary) and symmetry expanded.""" field_data = self.symmetry_expanded_copy if len(self.monitor.zero_dims) != 1: return field_data normal_dim = "xyz"[self.monitor.zero_dims[0]] update = {"grid_primal_correction": 1.0, "grid_dual_correction": 1.0} for field_name, field in field_data.field_components.items(): eig_val = self.symmetry_eigenvalues[field_name](normal_dim) if eig_val < 0: update[field_name] = field * self.grid_dual_correction else: update[field_name] = field * self.grid_primal_correction return field_data.copy(update=update) @property def intensity(self) -> ScalarFieldDataArray: """Return the sum of the squared absolute electric field components.""" normal_dim = "xyz"[self.monitor.size.index(0)] fields = self._colocated_fields components = ("Ex", "Ey", "Ez") if any(cmp not in fields for cmp in components): raise KeyError("Can't compute intensity, all E field components must be present.") intensity = sum(fields[cmp].abs ** 2 for cmp in components) return intensity.squeeze(dim=normal_dim, drop=True) @property def poynting(self) -> ScalarFieldDataArray: """Time-averaged Poynting vector for frequency-domain data associated to a 2D monitor, projected to the direction normal to the monitor plane.""" # Tangential fields are ordered as E1, E2, H1, H2 tan_fields = self._colocated_tangential_fields dim1, dim2 = self._tangential_dims e1 = tan_fields["E" + dim1] e2 = tan_fields["E" + dim2] h1 = tan_fields["H" + dim1] h2 = tan_fields["H" + dim2] e1_h2 = e1 * h2.conj() e2_h1 = e2 * h1.conj() e_x_h_star = e1_h2 - e2_h1 poynting = 0.5 * np.real(e_x_h_star) return poynting
[docs] def package_flux_results(self, flux_values: xr.DataArray) -> Any: """How to package flux""" return FluxDataArray(flux_values)
@cached_property def flux(self) -> FluxDataArray: """Flux for data corresponding to a 2D monitor.""" # Compute flux by integrating Poynting vector in-plane d_area = self._diff_area poynting = self.poynting flux_values = poynting * d_area flux_values = flux_values.sum(dim=d_area.dims) return self.package_flux_results(flux_values) @cached_property def mode_area(self) -> FreqModeDataArray: r"""Effective mode area corresponding to a 2D monitor. .. math: \frac{\left(\int |E|^2 \, {\rm d}S\right)^2}{\int |E|^4 \, {\rm d}S} """ intensity = self.intensity # integrate over the plane d_area = self._diff_area num = (intensity * d_area).sum(dim=d_area.dims) ** 2 den = (intensity**2 * d_area).sum(dim=d_area.dims) area = num / den if hasattr(self.monitor, "mode_spec"): area *= np.cos(self.monitor.mode_spec.angle_theta) return FreqModeDataArray(area)
[docs] def dot( self, field_data: Union[FieldData, ModeData, ModeSolverData], conjugate: bool = True ) -> ModeAmpsDataArray: r"""Dot product (modal overlap) with another :class:`.FieldData` object. Both datasets have to be frequency-domain data associated with a 2D monitor. Along the tangential directions, the datasets have to have the same discretization. Along the normal direction, the monitor position may differ and is ignored. Other coordinates (``frequency``, ``mode_index``) have to be either identical or broadcastable. Broadcasting is also supported in the case in which the other ``field_data`` has a dimension of size 1 whose coordinate is not in the list of coordinates in the ``self`` dataset along the corresponding dimension. In that case, the coordinates of the ``self`` dataset are used in the output. The dot product is defined as: .. math: \frac{1}{4} \int \left( E_0 \times H_1^* + H_0^* \times E_1 \) \, {\rm d}S Parameters ---------- field_data : :class:`ElectromagneticFieldData` A data instance to compute the dot product with. conjugate : bool, optional If ``True`` (default), the dot product is defined as above. If ``False``, the definition is similar, but without the complex conjugation of the $H$ fields. Note ---- The dot product with and without conjugation is equivalent (up to a phase) for modes in lossless waveguides but differs for modes in lossy materials. In that case, the conjugated dot product can be interpreted as the fraction of the power of the first mode carried by the second, but modes are not orthogonal with respect to that product and the sum of carried power fractions may be different from the total flux. In the non-conjugated definition, modes are orthogonal, but the interpretation of the dot product power carried by a given mode is no longer valid. """ # Tangential fields for current and other field data fields_self = self._colocated_tangential_fields fields_other = field_data._colocated_tangential_fields if conjugate: fields_self = {key: field.conj() for key, field in fields_self.items()} # Drop size-1 dimensions in the other data fields_other = {key: field.squeeze(drop=True) for key, field in fields_other.items()} # Cross products of fields dim1, dim2 = self._tangential_dims e_self_x_h_other = fields_self["E" + dim1] * fields_other["H" + dim2] e_self_x_h_other -= fields_self["E" + dim2] * fields_other["H" + dim1] h_self_x_e_other = fields_self["H" + dim1] * fields_other["E" + dim2] h_self_x_e_other -= fields_self["H" + dim2] * fields_other["E" + dim1] # Integrate over plane d_area = self._diff_area integrand = (e_self_x_h_other - h_self_x_e_other) * d_area return ModeAmpsDataArray(0.25 * integrand.sum(dim=d_area.dims))
def _interpolated_tangential_fields(self, coords: ArrayFloat2D) -> Dict[str, DataArray]: """For 2D monitors, interpolate this fields to given coords in the tangential plane. Parameters ---------- coords : ArrayFloat2D Interpolation coords in the monitor's tangential plane. Return ------ Dictionary with interpolated fields. """ fields = self._tangential_fields interp_dict = {"assume_sorted": True} for dim, cents in zip(self._tangential_dims, coords): if cents.size > 0: interp_dict[dim] = cents kwargs = {"bounds_error": False, "fill_value": 0.0} for component, field in fields.items(): fields[component] = field.interp(kwargs=kwargs, **interp_dict) return fields
[docs] def outer_dot( self, field_data: Union[FieldData, ModeData], conjugate: bool = True ) -> MixedModeDataArray: r"""Dot product (modal overlap) with another :class:`.FieldData` object. The tangential fields from ``field_data`` are interpolated to this object's grid, so the data arrays don't need to have the same discretization. The calculation is performed for all common frequencies between data arrays. In the output, ``mode_index_0`` and ``mode_index_1`` are the mode indices from this object and ``field_data``, respectively, if they are instances of ``ModeData``. The dot product is defined as: .. math: \frac{1}{4} \int \left( E_0 \times H_1^* + H_0^* \times E_1 \) \, {\rm d}S Parameters ---------- field_data : :class:`ElectromagneticFieldData` A data instance to compute the dot product with. conjugate : bool = True If ``True`` (default), the dot product is defined as above. If ``False``, the definition is similar, but without the complex conjugation of the $H$ fields. Returns ------- :class:`xarray.DataArray` Data array with the complex-valued modal overlaps between the two mode data. See also -------- :member:`dot` """ tan_dims = self._tangential_dims if not all(a == b for a, b in zip(tan_dims, field_data._tangential_dims)): raise DataError("Tangential dimensions must match between the two monitors.") # Tangential fields for current fields_self = self._colocated_tangential_fields if conjugate: fields_self = {component: field.conj() for component, field in fields_self.items()} # Tangential fields for other data fields_other = field_data._interpolated_tangential_fields(self._plane_grid_boundaries) # Tangential field component names dim1, dim2 = tan_dims e_1 = "E" + dim1 e_2 = "E" + dim2 h_1 = "H" + dim1 h_2 = "H" + dim2 # Prepare array with proper dimensions for the dot product data arrays = (fields_self[e_1], fields_other[e_1]) coords = (arrays[0].coords, arrays[1].coords) # Common frequencies to both data arrays f = np.array(sorted(set(coords[0]["f"].values).intersection(coords[1]["f"].values))) isel1 = [list(coords[0]["f"].values).index(freq) for freq in f] isel2 = [list(coords[1]["f"].values).index(freq) for freq in f] # Mode indices, if available modes_in_self = "mode_index" in coords[0] modes_in_other = "mode_index" in coords[1] keys = (e_1, e_2, h_1, h_2) for key in keys: fields_self[key] = fields_self[key].isel(f=isel1) if modes_in_self: fields_self[key] = fields_self[key].rename(mode_index="mode_index_0") else: fields_self[key] = fields_self[key].expand_dims( dim={"mode_index_0": [0]}, axis=len(fields_self[key].shape) ) fields_other[key] = fields_other[key].isel(f=isel2) if modes_in_other: fields_other[key] = fields_other[key].rename(mode_index="mode_index_1") else: fields_other[key] = fields_other[key].expand_dims( dim={"mode_index_1": [0]}, axis=len(fields_other[key].shape) ) d_area = self._diff_area.expand_dims(dim={"f": f}, axis=2).to_numpy() # function to apply at each pair of mode indices before integrating def fn(fields_1, fields_2): e_self_1 = fields_1[e_1] e_self_2 = fields_1[e_2] h_self_1 = fields_1[h_1] h_self_2 = fields_1[h_2] e_other_1 = fields_2[e_1] e_other_2 = fields_2[e_2] h_other_1 = fields_2[h_1] h_other_2 = fields_2[h_2] # Cross products of fields e_self_x_h_other = e_self_1 * h_other_2 - e_self_2 * h_other_1 h_self_x_e_other = h_self_1 * e_other_2 - h_self_2 * e_other_1 summand = 0.25 * (e_self_x_h_other - h_self_x_e_other) * d_area return summand result = self._outer_fn_summation( fields_1=fields_self, fields_2=fields_other, outer_dim_1="mode_index_0", outer_dim_2="mode_index_1", sum_dims=tan_dims, fn=fn, ) # Remove mode index coordinate if the input did not have it if not modes_in_self: result = result.isel(mode_index_0=0, drop=True) if not modes_in_other: result = result.isel(mode_index_1=0, drop=True) return result
@staticmethod def _outer_fn_summation( fields_1: Dict[str, xr.DataArray], fields_2: Dict[str, xr.DataArray], outer_dim_1: str, outer_dim_2: str, sum_dims: List[str], fn: Callable, ) -> xr.DataArray: """ Loop over ``outer_dim_1`` and ``outer_dim_2``, apply ``fn`` to ``fields_1`` and ``fields_2``, and sum over ``sum_dims``. The resulting ``xr.DataArray`` has has dimensions any dimensions in the fields which are not contained in sum_dims. This can be more memory efficient than vectorizing over the ``outer_dims``, which can involve broadcasting and reshaping data. It also converts to numpy arrays outside the loops to minimize xarray overhead. """ # first, convert to numpy outside the loop to reduce xarray overhead fields_1_numpy = {key: val.to_numpy() for key, val in fields_1.items()} fields_2_numpy = {key: val.to_numpy() for key, val in fields_2.items()} # get one of the data arrays to look at for indexing # assuming all data arrays have the same structure data_array_temp_1 = list(fields_1.values())[0] data_array_temp_2 = list(fields_2.values())[0] numpy_temp_1 = data_array_temp_1.to_numpy() numpy_temp_2 = data_array_temp_2.to_numpy() # find the numpy axes associated with the provided dimensions outer_axis_1 = data_array_temp_1.get_axis_num(outer_dim_1) outer_axis_2 = data_array_temp_2.get_axis_num(outer_dim_2) sum_axes = [data_array_temp_1.get_axis_num(dim) for dim in sum_dims] # coords and array for result of calculation coords = {key: val.to_numpy() for key, val in data_array_temp_1.coords.items()} for dim in sum_dims: coords.pop(dim) # last two inds are the outer_dims coords.pop(outer_dim_1) coords[outer_dim_1] = data_array_temp_1.coords[outer_dim_1].to_numpy() coords[outer_dim_2] = data_array_temp_2.coords[outer_dim_2].to_numpy() # drop scalar non-indexing dimensions coords = {key: val for key, val in coords.items() if len(val.shape) != 0} shape = [len(val) for val in coords.values()] dtype = np.promote_types(numpy_temp_1.dtype, numpy_temp_2.dtype) data = np.zeros(shape, dtype=dtype) # indexing tuples idx_1 = [slice(None)] * numpy_temp_1.ndim idx_2 = [slice(None)] * numpy_temp_2.ndim idx_data = [slice(None)] * data.ndim # calculate the sums of products for outer_1 in range(numpy_temp_1.shape[outer_axis_1]): for outer_2 in range(numpy_temp_2.shape[outer_axis_2]): idx_1[outer_axis_1] = outer_1 idx_2[outer_axis_2] = outer_2 idx_data[-2] = outer_1 idx_data[-1] = outer_2 fields_1_curr = {key: val[tuple(idx_1)] for key, val in fields_1_numpy.items()} fields_2_curr = {key: val[tuple(idx_2)] for key, val in fields_2_numpy.items()} summand_curr = fn(fields_1_curr, fields_2_curr) data_curr = np.sum(summand_curr, axis=tuple(sum_axes)) data[tuple(idx_data)] = data_curr return xr.DataArray(data, coords=coords) @property def time_reversed_copy(self) -> FieldData: """Make a copy of the data with time-reversed fields.""" # Time reversal for frequency-domain fields; overwritten in :class:`FieldTimeData`, # :class:`ModeData`, and :class:`ModeSolverData`. new_data = {} for comp, field in self.field_components.items(): if comp[0] == "H": new_data[comp] = -np.conj(field) else: new_data[comp] = np.conj(field) return self.copy(update=new_data)
[docs] class FieldData(FieldDataset, ElectromagneticFieldData): """ Data associated with a :class:`.FieldMonitor`: scalar components of E and H fields. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. This dataset can contain all electric and magnetic field components: ``Ex``, ``Ey``, ``Ez``, ``Hx``, ``Hy``, and ``Hz``. Example ------- >>> from tidy3d import ScalarFieldDataArray >>> x = [-1,1,3] >>> y = [-2,0,2,4] >>> z = [-3,-1,1,3,5] >>> f = [2e14, 3e14] >>> coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], f=f) >>> grid = Grid(boundaries=Coords(x=x, y=y, z=z)) >>> scalar_field = ScalarFieldDataArray((1+1j) * np.random.random((2,3,4,2)), coords=coords) >>> monitor = FieldMonitor( ... size=(2,4,6), freqs=[2e14, 3e14], name='field', fields=['Ex', 'Hz'], colocate=True ... ) >>> data = FieldData(monitor=monitor, Ex=scalar_field, Hz=scalar_field, grid_expanded=grid) .. TODO sort out standalone data example. See Also -------- **Notebooks:** * `Quickstart <../../notebooks/StartHere.html>`_: Usage in a basic simulation flow. * `Performing visualization of simulation data <../../notebooks/VizData.html>`_ * `Advanced monitor data manipulation and visualization <../../notebooks/XarrayTutorial.html>`_ """ monitor: FieldMonitor = pd.Field( ..., title="Monitor", description="Frequency-domain field monitor associated with the data." ) _contains_monitor_fields = enforce_monitor_fields_present()
[docs] def normalize(self, source_spectrum_fn: Callable[[float], complex]) -> FieldDataset: """Return copy of self after normalization is applied using source spectrum function.""" fields_norm = {} for field_name, field_data in self.field_components.items(): src_amps = source_spectrum_fn(field_data.f) fields_norm[field_name] = (field_data / src_amps).astype(field_data.dtype) return self.copy(update=fields_norm)
[docs] def to_source( self, source_time: SourceTimeType, center: Coordinate, size: Size = None, **kwargs ) -> CustomFieldSource: """Create a :class:`.CustomFieldSource` from the fields stored in the :class:`.FieldData`. Parameters ---------- source_time: :class:`.SourceTime` Specification of the source time-dependence. center: Tuple[float, float, float] Source center in x, y and z. size: Tuple[float, float, float] Source size in x, y, and z. If not provided, the size of the monitor associated to the data is used. **kwargs Extra keyword arguments passed to :class:`.CustomFieldSource`. Returns ------- :class:`.CustomFieldSource` Source injecting the fields stored in the :class:`.FieldData`, with other settings as provided in the input arguments. """ if not size: size = self.monitor.size fields = {} for name, field in self.symmetry_expanded_copy.field_components.items(): fields[name] = field.copy() for dim, dim_name in enumerate("xyz"): coords_shift = field.coords[dim_name] -[dim] fields[name].coords[dim_name] = coords_shift dataset = FieldDataset(**fields) return CustomFieldSource( field_dataset=dataset, source_time=source_time, center=center, size=size, **kwargs )
[docs] def make_adjoint_sources( self, dataset_names: list[str], fwidth: float ) -> List[Union[CustomCurrentSource, PointDipole]]: """Converts a :class:`.FieldData` to a list of adjoint current or point sources.""" if np.allclose(self.monitor.size, 0): return self.to_adjoint_point_sources(fwidth=fwidth) return self.to_adjoint_field_sources(fwidth=fwidth)
[docs] def to_adjoint_point_sources(self, fwidth: float) -> List[PointDipole]: """Create adjoint point dipole source if this field data contains one item.""" sources = [] for polarization, field_component in self.field_components.items(): if field_component is None: continue for freq0 in field_component.coords["f"]: omega0 = 2 * np.pi * freq0 scaling_factor = 1 / (MU_0 * omega0) forward_amp = self.get_amplitude(field_component.sel(f=freq0)) adj_phase = np.pi + np.angle(forward_amp) adj_amp = scaling_factor * forward_amp src_adj = PointDipole(, polarization=polarization, source_time=GaussianPulse( freq0=freq0, fwidth=fwidth, amplitude=abs(adj_amp), phase=adj_phase, ), interpolate=True, ) sources.append(src_adj) return sources
[docs] def to_adjoint_field_sources(self, fwidth: float) -> List[CustomCurrentSource]: """Create adjoint custom field sources if this field data has some dimensionality.""" sources = [] # Define source geometry based on coordinates in the data data_mins = [] data_maxs = [] def shift_value(coords) -> float: """How much to shift the geometry by along a dimension (only if > 1D).""" return SHIFT_VALUE_ADJ_FLD_SRC if len(coords) > 1 else 0 for _, field_component in self.field_components.items(): coords = field_component.coords data_mins.append({key: min(val) + shift_value(val) for key, val in coords.items()}) data_maxs.append({key: max(val) + shift_value(val) for key, val in coords.items()}) rmin = [] rmax = [] for dim in "xyz": rmin.append(max(val[dim] for val in data_mins)) rmax.append(min(val[dim] for val in data_maxs)) source_geo = Box.from_bounds(rmin=rmin, rmax=rmax) # Define source dataset # Offset coordinates by source center since local coords are assumed in CustomCurrentSource for freq0 in tuple(self.field_components.values())[0].coords["f"]: src_field_components = {} for name, field_component in self.field_components.items(): field_component = field_component.sel(f=freq0) forward_amps = field_component.values values = -1j * forward_amps coords = dict(field_component.coords.copy()) for dim, key in enumerate("xyz"): coords[key] = np.array(coords[key]) -[dim] coords["f"] = np.array([freq0]) values = np.expand_dims(values, axis=-1) if not np.all(values == 0): src_field_components[name] = ScalarFieldDataArray(values, coords=coords) dataset = FieldDataset(**src_field_components) custom_source = CustomCurrentSource(, size=source_geo.size, source_time=GaussianPulse( freq0=freq0, fwidth=fwidth, ), current_dataset=dataset, interpolate=True, ) sources.append(custom_source) return sources
[docs] class FieldTimeData(FieldTimeDataset, ElectromagneticFieldData): """ Data associated with a :class:`.FieldTimeMonitor`: scalar components of E and H fields. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. Example ------- >>> from tidy3d import ScalarFieldTimeDataArray >>> x = [-1,1,3] >>> y = [-2,0,2,4] >>> z = [-3,-1,1,3,5] >>> t = [0, 1e-12, 2e-12] >>> coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], t=t) >>> grid = Grid(boundaries=Coords(x=x, y=y, z=z)) >>> scalar_field = ScalarFieldTimeDataArray(np.random.random((2,3,4,3)), coords=coords) >>> monitor = FieldTimeMonitor( ... size=(2,4,6), interval=100, name='field', fields=['Ex', 'Hz'], colocate=True ... ) >>> data = FieldTimeData(monitor=monitor, Ex=scalar_field, Hz=scalar_field, grid_expanded=grid) """ monitor: FieldTimeMonitor = pd.Field( ..., title="Monitor", description="Time-domain field monitor associated with the data." ) _contains_monitor_fields = enforce_monitor_fields_present() @property def poynting(self) -> ScalarFieldTimeDataArray: """Instantaneous Poynting vector for time-domain data associated to a 2D monitor, projected to the direction normal to the monitor plane.""" # Tangential fields are ordered as E1, E2, H1, H2 tan_fields = self._colocated_tangential_fields dim1, dim2 = self._tangential_dims e_x_h = np.real(tan_fields["E" + dim1]) * np.real(tan_fields["H" + dim2]) e_x_h -= np.real(tan_fields["E" + dim2]) * np.real(tan_fields["H" + dim1]) return e_x_h @cached_property def flux(self) -> FluxTimeDataArray: """Flux for data corresponding to a 2D monitor.""" # Compute flux by integrating Poynting vector in-plane d_area = self._diff_area return FluxTimeDataArray((self.poynting * d_area).sum(dim=d_area.dims))
[docs] def dot(self, field_data: ElectromagneticFieldData, conjugate: bool = True) -> xr.DataArray: """Inner product is not defined for time-domain data.""" raise DataError("Inner product is not defined for time-domain data.")
@property def time_reversed_copy(self) -> FieldTimeData: """Make a copy of the data with time-reversed fields. The sign of the magnetic fields is flipped, and the data is reversed along the ``t`` dimension, such that for a given field, ``field[t_beg + t] -> field[t_end - t]``, where ``t_beg`` and ``t_end`` are the first and last coordinates along the ``t`` dimension. """ new_data = {} for comp, field in self.field_components.items(): if comp[0] == "H": new_data[comp] = -field else: new_data[comp] = field # Reverse time coordinates new_data[comp] = new_data[comp].assign_coords({"t": field.t[::-1]}).sortby("t") return self.copy(update=new_data)
[docs] class PermittivityData(PermittivityDataset, AbstractFieldData): """Data for a :class:`.PermittivityMonitor`: diagonal components of the permittivity tensor. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. Example ------- >>> from tidy3d import ScalarFieldDataArray >>> x = [-1,1,3] >>> y = [-2,0,2,4] >>> z = [-3,-1,1,3,5] >>> f = [2e14, 3e14] >>> coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], f=f) >>> grid = Grid(boundaries=Coords(x=x, y=y, z=z)) >>> sclr_fld = ScalarFieldDataArray((1+1j) * np.random.random((2,3,4,2)), coords=coords) >>> monitor = PermittivityMonitor(size=(2,4,6), freqs=[2e14, 3e14], name='eps') >>> data = PermittivityData( ... monitor=monitor, eps_xx=sclr_fld, eps_yy=sclr_fld, eps_zz=sclr_fld, grid_expanded=grid ... ) """ monitor: PermittivityMonitor = pd.Field( ..., title="Monitor", description="Permittivity monitor associated with the data." )
[docs] class ModeData(ModeSolverDataset, ElectromagneticFieldData): """ Data associated with a :class:`.ModeMonitor`: modal amplitudes, propagation indices and mode profiles. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. The mode monitor data contains the complex effective indices and the complex mode amplitudes at the monitor position calculated by mode decomposition. The data structure of the complex effective indices :attr`n_complex` contains two coordinates: ``f`` and ``mode_index``, both of which are specified when defining the :class:``ModeMonitor`` in the simulation. Besides the effective index, :class:``ModeMonitor`` is primarily used to calculate the transmission of certain modes in certain directions. We can extract the complex amplitude and square it to compute the mode transmission power. Example ------- >>> from tidy3d import ModeSpec >>> from tidy3d import ModeAmpsDataArray, ModeIndexDataArray >>> direction = ["+", "-"] >>> f = [1e14, 2e14, 3e14] >>> mode_index = np.arange(5) >>> index_coords = dict(f=f, mode_index=mode_index) >>> index_data = ModeIndexDataArray((1+1j) * np.random.random((3, 5)), coords=index_coords) >>> amp_coords = dict(direction=direction, f=f, mode_index=mode_index) >>> amp_data = ModeAmpsDataArray((1+1j) * np.random.random((2, 3, 5)), coords=amp_coords) >>> monitor = ModeMonitor( ... size=(2,0,6), ... freqs=[2e14, 3e14], ... mode_spec=ModeSpec(num_modes=5), ... name='mode', ... ) >>> data = ModeData(monitor=monitor, amps=amp_data, n_complex=index_data) """ monitor: ModeMonitor = pd.Field( ..., title="Monitor", description="Mode monitor associated with the data." ) amps: ModeAmpsDataArray = pd.Field( ..., title="Amplitudes", description="Complex-valued amplitudes associated with the mode." ) eps_spec: List[EpsSpecType] = pd.Field( None, title="Permettivity Specification", description="Characterization of the permittivity profile on the plane where modes are " "computed. Possible values are 'diagonal', 'tensorial_real', 'tensorial_complex'.", )
[docs] @pd.validator("eps_spec", always=True) @skip_if_fields_missing(["monitor"]) def eps_spec_match_mode_spec(cls, val, values): """Raise validation error if frequencies in eps_spec does not match frequency list""" if val: mode_data_freqs = values["monitor"].freqs if len(val) != len(mode_data_freqs): raise ValidationError( "eps_spec must be provided at the same frequencies as mode solver data." ) return val
[docs] def normalize(self, source_spectrum_fn) -> ModeData: """Return copy of self after normalization is applied using source spectrum function.""" source_freq_amps = source_spectrum_fn(self.amps.f)[None, :, None] new_amps = (self.amps / source_freq_amps).astype(self.amps.dtype) return self.copy(update=dict(amps=new_amps))
[docs] def overlap_sort( self, track_freq: TrackFreq, overlap_thresh: float = 0.9, ) -> ModeData: """Starting from the base frequency defined by parameter ``track_freq``, sort modes at each frequency according to their overlap values with the modes at the previous frequency. That is, it attempts to rearrange modes in such a way that a given ``mode_index`` corresponds to physically the same mode at all frequencies. Modes with overlap values over ``overlap_tresh`` are considered matching and not rearranged. Parameters ---------- track_freq : Literal["central", "lowest", "highest"] Parameter that specifies which frequency will serve as a starting point in the reordering process. overlap_thresh : float = 0.9 Modal overlap threshold above which two modes are considered to be the same and are not rearranged. If after the sorting procedure the overlap value between two corresponding modes is less than this threshold, a warning about a possible discontinuity is displayed. """ if len(self.field_components) == 0: return self.copy() num_freqs = len(self.monitor.freqs) num_modes = self.monitor.mode_spec.num_modes if track_freq == "lowest": f0_ind = 0 elif track_freq == "highest": f0_ind = num_freqs - 1 elif track_freq == "central": f0_ind = num_freqs // 2 # Compute sorting order and overlaps with neighboring frequencies sorting = -np.ones((num_freqs, num_modes), dtype=int) overlap = np.zeros((num_freqs, num_modes)) phase = np.zeros((num_freqs, num_modes)) sorting[f0_ind, :] = np.arange(num_modes) # base frequency won't change overlap[f0_ind, :] = np.ones(num_modes) # Sort in two directions from the base frequency for step, last_ind in zip([-1, 1], [-1, num_freqs]): # Start with the base frequency data_template = self._isel(f=[f0_ind]) # March to lower/higher frequencies for freq_id in range(f0_ind + step, last_ind, step): # Get next frequency to sort data_to_sort = self._isel(f=[freq_id]) # Assign to the base frequency so that outer_dot will compare them data_to_sort = data_to_sort._assign_coords(f=[self.monitor.freqs[f0_ind]]) # Compute "sorting w.r.t. to neighbor" and overlap values sorting_one_mode, amps_one_mode = data_template._find_ordering_one_freq( data_to_sort, overlap_thresh ) # Transform "sorting w.r.t. neighbor" to "sorting w.r.t. to f0_ind" sorting[freq_id, :] = sorting_one_mode[sorting[freq_id - step, :]] overlap[freq_id, :] = np.abs(amps_one_mode[sorting[freq_id - step, :]]) phase[freq_id, :] = phase[freq_id - step, :] + np.angle( amps_one_mode[sorting[freq_id - step, :]] ) # Check for discontinuities and show warning if any for mode_ind in list(np.nonzero(overlap[freq_id, :] < overlap_thresh)[0]): log.warning( f"Mode '{mode_ind}' appears to undergo a discontinuous change " f"between frequencies '{self.monitor.freqs[freq_id]}' " f"and '{self.monitor.freqs[freq_id - step]}' " f"(overlap: '{overlap[freq_id, mode_ind]:.2f}')." ) # Reassign for the next iteration data_template = data_to_sort # Rearrange modes using computed sorting values mode_data_sorted = self._reorder_modes( sorting=sorting, phase=phase, track_freq=track_freq, ) return mode_data_sorted
def _isel(self, **isel_kwargs): """Wraps ``xarray.DataArray.isel`` for all data fields that are defined over frequency and mode index. Used in ``overlap_sort`` but not officially supported since for example ``self.monitor.mode_spec`` and ``self.monitor.freqs`` will no longer be matching the newly created data.""" update_dict = dict(self._grid_correction_dict, **self.field_components) update_dict = {key: field.isel(**isel_kwargs) for key, field in update_dict.items()} return self._updated(update=update_dict) def _assign_coords(self, **assign_coords_kwargs): """Wraps ``xarray.DataArray.assign_coords`` for all data fields that are defined over frequency and mode index. Used in ``overlap_sort`` but not officially supported since for example ``self.monitor.mode_spec`` and ``self.monitor.freqs`` will no longer be matching the newly created data.""" update_dict = dict(self._grid_correction_dict, **self.field_components) update_dict = { key: field.assign_coords(**assign_coords_kwargs) for key, field in update_dict.items() } return self._updated(update=update_dict) def _find_ordering_one_freq( self, data_to_sort: ModeData, overlap_thresh: float, ) -> Tuple[Numpy, Numpy]: """Find new ordering of modes in data_to_sort based on their similarity to own modes.""" num_modes = self.n_complex.sizes["mode_index"] # Current pairs and their overlaps pairs = np.arange(num_modes) complex_amps = if self.monitor.store_fields_direction == "-": complex_amps *= -1 # Check whether modes already match modes_to_sort = np.where(np.abs(complex_amps) < overlap_thresh)[0] num_modes_to_sort = len(modes_to_sort) if num_modes_to_sort <= 1: return pairs, complex_amps # Extract all modes of interest from template data data_template_reduced = self._isel(mode_index=modes_to_sort) amps_reduced = data_template_reduced.outer_dot( data_to_sort._isel(mode_index=modes_to_sort) ).to_numpy()[0, :, :] if self.monitor.store_fields_direction == "-": amps_reduced *= -1 # Find the most similar modes and corresponding overlap values pairs_reduced, amps_reduced = self._find_closest_pairs(amps_reduced) # Insert new sorting and overlap values into arrays with all data complex_amps[modes_to_sort] = amps_reduced pairs[modes_to_sort] = modes_to_sort[pairs_reduced] return pairs, complex_amps @staticmethod def _find_closest_pairs(arr: Numpy) -> Tuple[Numpy, Numpy]: """Given a complex overlap matrix pair row and column entries.""" n, k = np.shape(arr) if n != k: raise DataError("Overlap matrix must be square.") arr_abs = np.abs(arr) pairs = -np.ones(n, dtype=int) values = np.zeros(n, dtype=np.complex128) for _ in range(n): imax, jmax = np.unravel_index(np.argmax(arr_abs, axis=None), (n, k)) pairs[imax] = jmax values[imax] = arr[imax, jmax] arr_abs[imax, :] = -1 arr_abs[:, jmax] = -1 return pairs, values def _reorder_modes( self, sorting: Numpy, phase: Numpy, track_freq: TrackFreq, ) -> ModeData: """Rearrange modes for the i-th frequency according to sorting[i, :] and apply phase shifts.""" num_freqs, _ = np.shape(sorting) # Create new dict with rearranged field components update_dict = {} for field_name, field in self.field_components.items(): field_sorted = field.copy() # Rearrange modes for freq_id in range(num_freqs):[..., freq_id, :] =[ ..., freq_id, sorting[freq_id, :] ] # Apply phase shift phase_fact = np.exp(-1j * phase[None, None, None, :, :]).astype( = * phase_fact update_dict[field_name] = field_sorted # Rearrange data over f and mode_index data_dict = dict(**self._grid_correction_dict, n_complex=self.n_complex) for key, data in data_dict.items(): update_dict[key] = data.copy() for freq_id in range(num_freqs): update_dict[key].data[freq_id, :] = update_dict[key].data[ freq_id, sorting[freq_id, :] ] # Update mode_spec in the monitor mode_spec = self.monitor.mode_spec.copy(update=dict(track_freq=track_freq)) update_dict["monitor"] = self.monitor.copy(update=dict(mode_spec=mode_spec)) return self.copy(update=update_dict) def _group_index_post_process(self, frequency_step: float) -> ModeData: """Calculate group index and remove added frequencies used only for this calculation. Parameters ---------- frequency_step: float Fractional frequency step used to calculate the group index. Returns ------- :class:`.ModeData` Filtered data with calculated group index. """ freqs = self.n_complex.coords["f"].values num_freqs = freqs.size back = slice(0, num_freqs, 3) center = slice(1, num_freqs, 3) fwd = slice(2, num_freqs, 3) freqs = freqs[center] # calculate group index n_center = self.n_eff.isel(f=center).values n_backward = self.n_eff.isel(f=back).values n_forward = self.n_eff.isel(f=fwd).values inv_step = 1 / frequency_step # n_g = n + f * df/dn # dn/df = (n+ - n-) / (2 f df) n_group_data = n_center + (n_forward - n_backward) * inv_step * 0.5 # D = -2 * pi * c / lda^2 * d(v_g^-1)/dw = -(f / c)^2 * (2 * dn/df + f * d2n/df2) # d2n/df2 = (n+ - 2n + n-) / (f df)^2 # The '1e18' factor converts from s/um^2 to ps/(nm km) dispersion_data = ( (n_forward * (inv_step + 1) + n_backward * (inv_step - 1) - n_center * inv_step * 2) * freqs.reshape((-1, 1)) * (-1e18 * inv_step / C_0**2) ) mode_index = list(self.n_complex.coords["mode_index"].values) f = list(freqs) n_group = GroupIndexDataArray( n_group_data, coords={"f": f, "mode_index": mode_index}, ) dispersion = ModeDispersionDataArray( dispersion_data, coords={"f": f, "mode_index": mode_index}, ) # remove data corresponding to frequencies used only for group index calculation update_dict = { "n_complex": self.n_complex.isel(f=center), "n_group_raw": n_group, "dispersion_raw": dispersion, } for key, field in self.field_components.items(): update_dict[key] = field.isel(f=center) for key, data in self._grid_correction_dict.items(): update_dict[key] = data.isel(f=center) if self.eps_spec: update_dict["eps_spec"] = self.eps_spec[center] update_dict["monitor"] = self.monitor.updated_copy(freqs=freqs) return self.copy(update=update_dict) @property def time_reversed_copy(self) -> FieldData: """Make a copy of the data with direction-reversed fields. In lossy or gyrotropic systems, the time-reversed fields will not be the same as the backward-propagating modes.""" # Time reversal new_data = {} for comp, field in self.field_components.items(): if comp[0] == "H": new_data[comp] = -np.conj(field) else: new_data[comp] = np.conj(field) # switch direction in the monitor mnt = self.monitor new_dir = "+" if mnt.store_fields_direction == "-" else "-" new_data["monitor"] = mnt.updated_copy(store_fields_direction=new_dir) return self.copy(update=new_data) def _colocated_propagation_axes_field(self, field_name: Literal["E", "H"]) -> xr.DataArray: """Collect a field DataArray containing all 3 field components and rotate from frame with normal axis along z to frame with propagation axis along z. """ tan_dims = self._tangential_dims normal_dim = "xyz"[self.monitor.zero_dims[0]] fields = self._colocated_fields fields = {key: val.squeeze(dim=normal_dim, drop=True) for key, val in fields.items()} mode_spec = self.monitor.mode_spec # fields as a (3, ...) numpy array ordered as [tangential1, tagential2, normal] field = [fields[field_name + dim].values for dim in tan_dims] field = np.array(field + [fields[field_name + normal_dim].values]) # rotate axes if mode_spec.angle_phi != 0: field = self.monitor.rotate_points(field, [0, 0, 1], -mode_spec.angle_phi) if mode_spec.angle_theta != 0: field = self.monitor.rotate_points(field, [0, 1, 0], -mode_spec.angle_theta) # new coords for the (3, ...) array coords = {"component": [0, 1, 2]} # fields are colocated, so all components should have the same coords for dim in fields["Ex"].dims: coords.update({dim: fields["Ex"].coords[dim]}) return xr.DataArray(data=field, coords=coords) @cached_property def pol_fraction(self) -> xr.Dataset: r"""Compute the TE and TM polarization fraction defined as the field intensity along the first or the second of the two tangential axes. More precisely, if $E_1$ and $E_2$ are the electric field components along the two tangential axes, the TE fraction is defined as: .. math:: \frac{\int |E_1|^2 \, {\rm d}S}{\int \left(|E_1|^2 + |E_2|^2\right) \, {\rm d}S} and the TM fraction is equal to one minus the TE fraction. The tangential axes are defined by popping the normal axis from the list of ``x, y, z``, so e.g. ``x`` and ``z`` for propagation in the ``y`` direction. """ if len(self.field_components) == 0: raise DataError( "Field data not included in this ModeData onbject. Set " "'ModeMonitor.store_fields_direction' to the desired propagation direction to " "include the mode field profiles in the corresponding 'ModeData'." ) tan_dims = self._tangential_dims e_field = self._colocated_propagation_axes_field("E") diff_area = self._diff_area tm_int = (diff_area * np.abs(e_field.sel(component=1, drop=True)) ** 2).sum(dim=tan_dims) te_int = (diff_area * np.abs(e_field.sel(component=0, drop=True)) ** 2).sum(dim=tan_dims) te_frac = te_int / (te_int + tm_int) return xr.Dataset(data_vars={"te": te_frac, "tm": 1 - te_frac}) @cached_property def pol_fraction_waveguide(self) -> xr.Dataset: r"""Compute the TE and TM polarization fraction using the waveguide definition. If $n$ is the propagation direction, the TE fraction is defined as: .. math:: 1 - \frac{\int |E \cdot n|^2 \, {\rm d}S}{\int |E|^2 \, {\rm d}S} and the TM fraction is defined as .. math:: 1 - \frac{\int |H \cdot n|^2 \, {\rm d}S}{\int |H|^2 \, {\rm d}S} Note ---- The waveguide TE and TM fractions do not sum to one. For example, TEM modes that are completely transverse (zero electric and magnetic field in the propagation direction) have TE fraction and TM fraction both equal to one. """ if len(self.field_components) == 0: raise DataError( "Field data not included in this ModeData onbject. Set " "'ModeMonitor.store_fields_direction' to the desired propagation direction to " "include the mode field profiles in the corresponding 'ModeData'." ) tan_dims = self._tangential_dims e_field = self._colocated_propagation_axes_field("E") h_field = self._colocated_propagation_axes_field("H") diff_area = self._diff_area # te fraction field_int = [np.abs(e_field.sel(component=ind, drop=True)) ** 2 for ind in range(3)] norm_int = (diff_area * field_int[2]).sum(dim=tan_dims) tot_int = norm_int + (diff_area * (field_int[0] + field_int[1])).sum(dim=tan_dims) te_frac = 1 - norm_int / tot_int # tm fraction field_int = [np.abs(h_field.sel(component=ind, drop=True)) ** 2 for ind in range(3)] norm_int = (diff_area * field_int[2]).sum(dim=tan_dims) tot_int = norm_int + (diff_area * (field_int[0] + field_int[1])).sum(dim=tan_dims) tm_frac = 1 - norm_int / tot_int return xr.Dataset(data_vars={"te": te_frac, "tm": tm_frac}) @property def modes_info(self) -> xr.Dataset: """Dataset collecting various properties of the stored modes.""" lambda_cm = C_0 / self.k_eff.f / 1e4 loss_db_cm = 20 * 2 * np.pi * np.log10(np.e) * self.k_eff / lambda_cm info = { "wavelength": C_0 / self.n_eff.f, "n eff": self.n_eff, "k eff": self.k_eff, "loss (dB/cm)": loss_db_cm, f"TE (E{self._tangential_dims[0]}) fraction": None, "wg TE fraction": None, "wg TM fraction": None, "mode area": None, "group index": self.n_group_raw, # Use raw field to avoid issuing a warning "dispersion (ps/(nm km))": self.dispersion_raw, # Use raw field to avoid issuing a warning } if self.n_group_raw is not None: info["group index"] = self.n_group_raw if len(self.field_components) == 6: info["mode area"] = self.mode_area info[f"TE (E{self._tangential_dims[0]}) fraction"] = self.pol_fraction["te"] info["wg TE fraction"] = self.pol_fraction_waveguide["te"] info["wg TM fraction"] = self.pol_fraction_waveguide["tm"] return xr.Dataset(data_vars=info)
[docs] def to_dataframe(self) -> DataFrame: """xarray-like method to export the ``modes_info`` into a pandas dataframe which is e.g. simple to visualize as a table.""" dataset = self.modes_info drop = [] if not np.any(dataset["group index"].values): drop.append("group index") if not np.any(dataset["dispersion (ps/(nm km))"].values): drop.append("dispersion (ps/(nm km))") if np.all(dataset["loss (dB/cm)"] == 0): drop.append("loss (dB/cm)") return dataset.drop_vars(drop).to_dataframe()
[docs] def make_adjoint_sources(self, dataset_names: list[str], fwidth: float) -> list[ModeSource]: """Get all adjoint sources for the ``ModeMonitorData``.""" adjoint_sources = [] for name in dataset_names: if name == "amps": adjoint_sources += self.make_adjoint_sources_amps(fwidth=fwidth) else: log.warning( f"Can't create adjoint source for 'ModeData.{type(self)}.{name}'. " f"for monitor '{}'. " "It's likely your objective function depends on sim data that is un-traced. " "Double check your post-processing function to confirm. " ) return adjoint_sources
[docs] def make_adjoint_sources_amps(self, fwidth: float) -> list[ModeSource]: """Generate adjoint sources for ``ModeMonitorData.amps``.""" coords = self.amps.coords adjoint_sources = [] # TODO: speed up with ufunc? for freq in coords["f"]: for direction in coords["direction"]: for mode_index in coords["mode_index"]: amp_single = self.amps.sel(f=freq, direction=direction, mode_index=mode_index) if self.get_amplitude(amp_single) == 0.0: continue adjoint_source = self.adjoint_source_amp(amp=amp_single, fwidth=fwidth) adjoint_sources.append(adjoint_source) return adjoint_sources
[docs] def adjoint_source_amp(self, amp: DataArray, fwidth: float) -> ModeSource: """Generate an adjoint ``ModeSource`` for a single amplitude.""" monitor = self.monitor # grab coordinates coords = amp.coords freq0 = coords["f"] direction = coords["direction"] mode_index = coords["mode_index"] # determine the complex amplitude amp_complex = self.get_amplitude(amp) k0 = 2 * np.pi * freq0 / C_0 grad_const = k0 / 4 / ETA_0 src_amp = grad_const * amp_complex # construct source src_adj = ModeSource( source_time=GaussianPulse( amplitude=abs(src_amp), phase=np.angle(src_amp), freq0=freq0, fwidth=fwidth, ), mode_spec=monitor.mode_spec, size=monitor.size,, direction=self.flip_direction(direction), mode_index=mode_index, ) return src_adj
[docs] class ModeSolverData(ModeData): """ Data associated with a :class:`.ModeSolverMonitor`: scalar components of E and H fields. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. Example ------- >>> from tidy3d import ModeSpec >>> from tidy3d import ScalarModeFieldDataArray, ModeIndexDataArray >>> x = [-1,1,3] >>> y = [-2,0] >>> z = [-3,-1,1,3,5] >>> f = [2e14, 3e14] >>> mode_index = np.arange(5) >>> grid = Grid(boundaries=Coords(x=x, y=y, z=z)) >>> field_coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], f=f, mode_index=mode_index) >>> field = ScalarModeFieldDataArray((1+1j)*np.random.random((2,1,4,2,5)), coords=field_coords) >>> index_coords = dict(f=f, mode_index=mode_index) >>> index_data = ModeIndexDataArray((1+1j) * np.random.random((2,5)), coords=index_coords) >>> monitor = ModeSolverMonitor( ... size=(2,0,6), ... freqs=[2e14, 3e14], ... mode_spec=ModeSpec(num_modes=5), ... name='mode_solver', ... ) >>> data = ModeSolverData( ... monitor=monitor, ... Ex=field, ... Ey=field, ... Ez=field, ... Hx=field, ... Hy=field, ... Hz=field, ... n_complex=index_data, ... grid_expanded=grid ... ) """ monitor: ModeSolverMonitor = pd.Field( ..., title="Monitor", description="Mode solver monitor associated with the data." ) amps: ModeAmpsDataArray = pd.Field( None, title="Amplitudes", description="Unused for ModeSolverData." )
[docs] def normalize(self, source_spectrum_fn: Callable[[float], complex]) -> ModeSolverData: """Return copy of self after normalization is applied using source spectrum function.""" return self.copy()
@property def time_reversed_copy(self) -> FieldData: """Make a copy of the data with direction-reversed fields. In lossy or gyrotropic systems, the time-reversed fields will not be the same as the backward-propagating modes.""" # Time reversal new_data = {} for comp, field in self.field_components.items(): if comp[0] == "H": new_data[comp] = -np.conj(field) else: new_data[comp] = np.conj(field) # switch direction in the monitor mnt = self.monitor new_dir = "+" if mnt.store_fields_direction == "-" else "-" new_data["monitor"] = mnt.updated_copy(direction=new_dir, store_fields_direction=new_dir) return self.copy(update=new_data)
[docs] class FluxData(MonitorData): """ Data associated with a :class:`.FluxMonitor`: flux data in the frequency-domain. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. We can access the data for each monitor by indexing into the :class:`SimulationData` with the monitor ``.name``. For the flux monitor data, we can access the raw flux data as a function of frequency with ``.flux``. As most data are multidimensional, it’s often very helpful to print out the data and directly inspect its structure. Example ------- >>> from tidy3d import FluxDataArray >>> f = [2e14, 3e14] >>> coords = dict(f=f) >>> flux_data = FluxDataArray(np.random.random(2), coords=coords) >>> monitor = FluxMonitor(size=(2,0,6), freqs=[2e14, 3e14], name='flux') >>> data = FluxData(monitor=monitor, flux=flux_data) See Also -------- **Notebooks:** * `Advanced monitor data manipulation and visualization <../../notebooks/XarrayTutorial.html>`_ """ monitor: FluxMonitor = pd.Field( ..., title="Monitor", description="Frequency-domain flux monitor associated with the data." ) flux: FluxDataArray = pd.Field( ..., title="Flux", description="Flux values in the frequency-domain." )
[docs] def normalize(self, source_spectrum_fn) -> FluxData: """Return copy of self after normalization is applied using source spectrum function.""" source_freq_amps = source_spectrum_fn(self.flux.f) source_power = abs(source_freq_amps) ** 2 new_flux = (self.flux / source_power).astype(self.flux.dtype) return self.copy(update=dict(flux=new_flux))
[docs] class FluxTimeData(MonitorData): """ Data associated with a :class:`.FluxTimeMonitor`: flux data in the time-domain. Notes ----- The data is stored as a `DataArray <>`_ object using the `xarray <>`_ package. Example ------- >>> from tidy3d import FluxTimeDataArray >>> t = [0, 1e-12, 2e-12] >>> coords = dict(t=t) >>> flux_data = FluxTimeDataArray(np.random.random(3), coords=coords) >>> monitor = FluxTimeMonitor(size=(2,0,6), interval=100, name='flux_time') >>> data = FluxTimeData(monitor=monitor, flux=flux_data) """ monitor: FluxTimeMonitor = pd.Field( ..., title="Monitor", description="Time-domain flux monitor associated with the data." ) flux: FluxTimeDataArray = pd.Field( ..., title="Flux", description="Flux values in the time-domain." )
ProjFieldType = Union[ FieldProjectionAngleDataArray, FieldProjectionCartesianDataArray, FieldProjectionKSpaceDataArray, DiffractionDataArray, ] ProjMonitorType = Union[ FieldProjectionAngleMonitor, FieldProjectionCartesianMonitor, FieldProjectionKSpaceMonitor, DiffractionMonitor, ]
[docs] class AbstractFieldProjectionData(MonitorData): """Collection of projected fields in spherical coordinates in the frequency domain.""" monitor: ProjMonitorType = pd.Field( ..., title="Projection monitor", description="Field projection monitor.", discriminator=TYPE_TAG_STR, ) Er: ProjFieldType = pd.Field( ..., title="Ephi", description="Spatial distribution of r-component of the electric field.", ) Etheta: ProjFieldType = pd.Field( ..., title="Etheta", description="Spatial distribution of the theta-component of the electric field.", ) Ephi: ProjFieldType = pd.Field( ..., title="Ephi", description="Spatial distribution of phi-component of the electric field.", ) Hr: ProjFieldType = pd.Field( ..., title="Hphi", description="Spatial distribution of r-component of the magnetic field.", ) Htheta: ProjFieldType = pd.Field( ..., title="Htheta", description="Spatial distribution of theta-component of the magnetic field.", ) Hphi: ProjFieldType = pd.Field( ..., title="Hphi", description="Spatial distribution of phi-component of the magnetic field.", ) medium: MediumType = pd.Field( Medium(), title="Background Medium", description="Background medium through which to project fields.", discriminator=TYPE_TAG_STR, ) is_2d_simulation: bool = pd.Field( False, title="2D Simulation", description="Indicates whether the monitor data is for a 2D simulation.", ) @property def field_components(self) -> Dict[str, DataArray]: """Maps the field components to their associated data.""" return dict( Er=self.Er, Etheta=self.Etheta, Ephi=self.Ephi, Hr=self.Hr, Htheta=self.Htheta, Hphi=self.Hphi, ) @property def f(self) -> np.ndarray: """Frequencies.""" return np.array(self.Etheta.coords["f"]) @property def coords(self) -> Dict[str, np.ndarray]: """Coordinates of the fields contained.""" return self.Etheta.coords @property def coords_spherical(self) -> Dict[str, np.ndarray]: """Coordinates grid for the fields in the spherical system.""" if "theta" in self.coords.keys(): r, theta, phi = np.meshgrid( self.coords["r"].values, self.coords["theta"].values, self.coords["phi"].values, indexing="ij", ) elif "z" in self.coords.keys(): xs, ys, zs = np.meshgrid( self.coords["x"].values, self.coords["y"].values, self.coords["z"].values, indexing="ij", ) r, theta, phi = self.monitor.car_2_sph(xs, ys, zs) else: uxs, uys, r = np.meshgrid( self.coords["ux"].values, self.coords["uy"].values, self.coords["r"].values, indexing="ij", ) theta, phi = self.monitor.kspace_2_sph(uxs, uys, self.monitor.proj_axis) return {"r": r, "theta": theta, "phi": phi} @property def dims(self) -> Tuple[str, ...]: """Dimensions of the radiation vectors contained.""" return self.Etheta.dims
[docs] def make_data_array(self, data: np.ndarray) -> xr.DataArray: """Make an xr.DataArray with data and same coords and dims as fields of self.""" return xr.DataArray(data=data, coords=self.coords, dims=self.dims)
[docs] def make_dataset(self, keys: Tuple[str, ...], vals: Tuple[np.ndarray, ...]) -> xr.Dataset: """Make an xr.Dataset with keys and data with same coords and dims as fields.""" data_arrays = tuple(map(self.make_data_array, vals)) return xr.Dataset(dict(zip(keys, data_arrays)))
[docs] def make_renormalized_data( self, phase: np.ndarray, proj_distance: float ) -> AbstractFieldProjectionData: """Helper to apply the re-projection phase to a copied dataset.""" new_data = self.copy() for field in new_data.field_components.values(): field.values *= phase if "r" in self.coords.keys(): field["r"] = np.atleast_1d(proj_distance) return new_data
[docs] def normalize( self, source_spectrum_fn: Callable[[float], complex] ) -> AbstractFieldProjectionData: """Return copy of self after normalization is applied using source spectrum function.""" fields_norm = {} for field_name, field_data in self.field_components.items(): src_amps = source_spectrum_fn(field_data.f) fields_norm[field_name] = (field_data / src_amps).astype(field_data.dtype) return self.copy(update=fields_norm)
[docs] @staticmethod def wavenumber(medium: MediumType, frequency: float) -> complex: """Complex valued wavenumber associated with a frequency.""" index_n, index_k = medium.nk_model(frequency=frequency) return (2 * np.pi * frequency / C_0) * (index_n + 1j * index_k)
@property def nk(self) -> Tuple[float, float]: """Returns the real and imaginary parts of the background medium's refractive index.""" return self.medium.nk_model(frequency=self.f) @property def k(self) -> complex: """Returns the complex wave number associated with the background medium.""" return self.wavenumber(medium=self.medium, frequency=self.f) @property def eta(self) -> complex: """Returns the complex wave impedance associated with the background medium.""" eps_complex = self.medium.eps_model(frequency=self.f) return ETA_0 / np.sqrt(eps_complex)
[docs] @staticmethod def propagation_factor(dist: Union[float, None], k: complex, is_2d_simulation: bool) -> complex: """A normalization factor that includes both phase and amplitude decay associated with propagation over a distance with a given wavenumber.""" if dist is None: return 1.0 if is_2d_simulation: return np.exp(1j * k * dist) * np.sqrt(-1j * k / (8 * np.pi * dist)) return -1j * k * np.exp(1j * k * dist) / (4 * np.pi * dist)
@property def fields_spherical(self) -> xr.Dataset: """Get all field components in spherical coordinates relative to the monitor's local origin for all projection grid points and frequencies specified in the :class:`AbstractFieldProjectionMonitor`. Returns ------- ``xarray.Dataset`` xarray dataset containing (``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``) in spherical coordinates. """ return self.make_dataset( keys=self.field_components.keys(), vals=self.field_components.values() ) @property def fields_cartesian(self) -> xr.Dataset: """Get all field components in Cartesian coordinates relative to the monitor's local origin for all projection grid points and frequencies specified in the :class:`AbstractFieldProjectionMonitor`. Returns ------- ``xarray.Dataset`` xarray dataset containing (``Ex``, ``Ey``, ``Ez``, ``Hx``, ``Hy``, ``Hz``) in Cartesian coordinates. """ # convert the field components to the Cartesian coordinate system coords_sph = self.coords_spherical e_data = self.monitor.sph_2_car_field( self.Er.values, self.Etheta.values, self.Ephi.values, coords_sph["theta"][..., None], coords_sph["phi"][..., None], ) h_data = self.monitor.sph_2_car_field( self.Hr.values, self.Htheta.values, self.Hphi.values, coords_sph["theta"][..., None], coords_sph["phi"][..., None], ) # package into dataset keys = ("Ex", "Ey", "Ez", "Hx", "Hy", "Hz") field_components = np.concatenate((e_data, h_data), axis=0) return self.make_dataset(keys=keys, vals=field_components) @property def power(self) -> xr.DataArray: """Get power measured on the projection grid relative to the monitor's local origin. Returns ------- ``xarray.DataArray`` Power at points relative to the local origin. """ power_theta = 0.5 * np.real(self.Etheta.values * np.conj(self.Hphi.values)) power_phi = 0.5 * np.real(-self.Ephi.values * np.conj(self.Htheta.values)) power = power_theta + power_phi return self.make_data_array(data=power) @property def radar_cross_section(self) -> xr.DataArray: """Radar cross section in units of incident power.""" _, index_k = self.nk if not np.all(index_k == 0): raise SetupError("Can't compute RCS for a lossy background medium.") k = self.k[None, None, None, ...] eta = self.eta[None, None, None, ...] if self.is_2d_simulation: constant = k**2 / (16 * np.pi * eta) else: constant = k**2 / (8 * np.pi * eta) # normalize fields by the distance-based phase factor coords_sph = self.coords_spherical if coords_sph["r"] is None: phase = 1.0 else: phase = self.propagation_factor( dist=coords_sph["r"][..., None], k=k, is_2d_simulation=self.is_2d_simulation ) Etheta = self.Etheta.values / phase Ephi = self.Ephi.values / phase rcs_data = constant * (np.abs(Etheta) ** 2 + np.abs(Ephi) ** 2) return self.make_data_array(data=rcs_data)
[docs] class FieldProjectionAngleData(AbstractFieldProjectionData): """Data associated with a :class:`.FieldProjectionAngleMonitor`: components of projected fields. Example ------- >>> from tidy3d import FieldProjectionAngleDataArray >>> f = np.linspace(1e14, 2e14, 10) >>> r = np.atleast_1d(5) >>> theta = np.linspace(0, np.pi, 10) >>> phi = np.linspace(0, 2*np.pi, 20) >>> coords = dict(r=r, theta=theta, phi=phi, f=f) >>> values = (1+1j) * np.random.random((len(r), len(theta), len(phi), len(f))) >>> scalar_field = FieldProjectionAngleDataArray(values, coords=coords) >>> monitor = FieldProjectionAngleMonitor( ... center=(1,2,3), size=(2,2,2), freqs=f, name='n2f_monitor', phi=phi, theta=theta ... ) >>> data = FieldProjectionAngleData( ... monitor=monitor, Er=scalar_field, Etheta=scalar_field, Ephi=scalar_field, ... Hr=scalar_field, Htheta=scalar_field, Hphi=scalar_field, ... projection_surfaces=monitor.projection_surfaces, ... ) """ monitor: FieldProjectionAngleMonitor = pd.Field( ..., title="Projection monitor", description="Field projection monitor with an angle-based projection grid.", ) projection_surfaces: Tuple[FieldProjectionSurface, ...] = pd.Field( ..., title="Projection surfaces", description="Surfaces of the monitor where near fields were recorded for projection", ) Er: FieldProjectionAngleDataArray = pd.Field( ..., title="Er", description="Spatial distribution of r-component of the electric field.", ) Etheta: FieldProjectionAngleDataArray = pd.Field( ..., title="Etheta", description="Spatial distribution of the theta-component of the electric field.", ) Ephi: FieldProjectionAngleDataArray = pd.Field( ..., title="Ephi", description="Spatial distribution of phi-component of the electric field.", ) Hr: FieldProjectionAngleDataArray = pd.Field( ..., title="Hr", description="Spatial distribution of r-component of the magnetic field.", ) Htheta: FieldProjectionAngleDataArray = pd.Field( ..., title="Htheta", description="Spatial distribution of theta-component of the magnetic field.", ) Hphi: FieldProjectionAngleDataArray = pd.Field( ..., title="Hphi", description="Spatial distribution of phi-component of the magnetic field.", ) @property def r(self) -> np.ndarray: """Radial distance.""" return self.Etheta.r.values @property def theta(self) -> np.ndarray: """Polar angles.""" return self.Etheta.theta.values @property def phi(self) -> np.ndarray: """Azimuthal angles.""" return self.Etheta.phi.values
[docs] def renormalize_fields(self, proj_distance: float) -> FieldProjectionAngleData: """Return a :class:`.FieldProjectionAngleData` with fields re-normalized to a new projection distance, by applying a phase factor based on ``proj_distance``. Parameters ---------- proj_distance : float = None (micron) new radial distance relative to the monitor's local origin. Returns ------- :class:`.FieldProjectionAngleData` Copy of this :class:`.FieldProjectionAngleData` with fields re-projected to ``proj_distance``. """ if self.monitor and not self.monitor.far_field_approx: raise DataError( "Fields projected without invoking the far field approximation " "cannot be re-projected to a new distance." ) # the phase factor associated with the old distance must be removed r = self.coords_spherical["r"][..., None] old_phase = self.propagation_factor( dist=r, k=self.k[None, None, None, :], is_2d_simulation=self.is_2d_simulation ) # the phase factor associated with the new distance must be applied new_phase = self.propagation_factor( dist=proj_distance, k=self.k, is_2d_simulation=self.is_2d_simulation ) # net phase phase = new_phase[None, None, None, :] / old_phase # compute updated fields and their coordinates return self.make_renormalized_data(phase, proj_distance)
[docs] class FieldProjectionCartesianData(AbstractFieldProjectionData): """Data associated with a :class:`.FieldProjectionCartesianMonitor`: components of projected fields. Example ------- >>> from tidy3d import FieldProjectionCartesianDataArray >>> f = np.linspace(1e14, 2e14, 10) >>> x = np.linspace(0, 5, 10) >>> y = np.linspace(0, 10, 20) >>> z = np.atleast_1d(5) >>> coords = dict(x=x, y=y, z=z, f=f) >>> values = (1+1j) * np.random.random((len(x), len(y), len(z), len(f))) >>> scalar_field = FieldProjectionCartesianDataArray(values, coords=coords) >>> monitor = FieldProjectionCartesianMonitor( ... center=(1,2,3), size=(2,2,2), freqs=f, name='n2f_monitor', x=x, y=y, ... proj_axis=2, proj_distance=50 ... ) >>> data = FieldProjectionCartesianData( ... monitor=monitor, Er=scalar_field, Etheta=scalar_field, Ephi=scalar_field, ... Hr=scalar_field, Htheta=scalar_field, Hphi=scalar_field, ... projection_surfaces=monitor.projection_surfaces, ... ) """ monitor: FieldProjectionCartesianMonitor = pd.Field( ..., title="Projection monitor", description="Field projection monitor with a Cartesian projection grid.", ) projection_surfaces: Tuple[FieldProjectionSurface, ...] = pd.Field( ..., title="Projection surfaces", description="Surfaces of the monitor where near fields were recorded for projection", ) Er: FieldProjectionCartesianDataArray = pd.Field( ..., title="Er", description="Spatial distribution of r-component of the electric field.", ) Etheta: FieldProjectionCartesianDataArray = pd.Field( ..., title="Etheta", description="Spatial distribution of the theta-component of the electric field.", ) Ephi: FieldProjectionCartesianDataArray = pd.Field( ..., title="Ephi", description="Spatial distribution of phi-component of the electric field.", ) Hr: FieldProjectionCartesianDataArray = pd.Field( ..., title="Hr", description="Spatial distribution of r-component of the magnetic field.", ) Htheta: FieldProjectionCartesianDataArray = pd.Field( ..., title="Htheta", description="Spatial distribution of theta-component of the magnetic field.", ) Hphi: FieldProjectionCartesianDataArray = pd.Field( ..., title="Hphi", description="Spatial distribution of phi-component of the magnetic field.", ) @property def x(self) -> np.ndarray: """X positions.""" return self.Etheta.x.values @property def y(self) -> np.ndarray: """Y positions.""" return self.Etheta.y.values @property def z(self) -> np.ndarray: """Z positions.""" return self.Etheta.z.values @property def tangential_dims(self): tangential_dims = ["x", "y", "z"] tangential_dims.pop(self.monitor.proj_axis) return tangential_dims @property def poynting(self) -> xr.DataArray: """Time-averaged Poynting vector for field data associated to a Cartesian field projection monitor.""" fc = self.fields_cartesian dim1, dim2 = self.tangential_dims e1 = fc["E" + dim1] e2 = fc["E" + dim2] h1 = fc["H" + dim1] h2 = fc["H" + dim2] e1_h2 = e1 * h2.conj() e2_h1 = e2 * h1.conj() e_x_h_star = e1_h2 - e2_h1 return 0.5 * np.real(e_x_h_star) @cached_property def flux(self) -> FluxDataArray: """Flux for projecteded field data corresponding to a Cartesian field projection monitor.""" flux = self.poynting.integrate(self.tangential_dims) return FluxDataArray(flux)
[docs] def renormalize_fields(self, proj_distance: float) -> FieldProjectionCartesianData: """Return a :class:`.FieldProjectionCartesianData` with fields re-normalized to a new projection distance, by applying a phase factor based on ``proj_distance``. Parameters ---------- proj_distance : float = None (micron) new plane distance relative to the monitor's local origin. Returns ------- :class:`.FieldProjectionCartesianData` Copy of this :class:`.FieldProjectionCartesianData` with fields re-projected to ``proj_distance``. """ if not self.monitor.far_field_approx: raise DataError( "Fields projected without invoking the far field approximation " "cannot be re-projected to a new distance." ) # the phase factor associated with the old distance must be removed k = self.k[None, None, None, :] r = self.coords_spherical["r"][..., None] old_phase = self.propagation_factor(dist=r, k=k, is_2d_simulation=self.is_2d_simulation) # update the field components' projection distance norm_dir, _ = self.monitor.pop_axis(["x", "y", "z"], axis=self.monitor.proj_axis) for field in self.field_components.values(): field[norm_dir] = np.atleast_1d(proj_distance) # the phase factor associated with the new distance must be applied r = self.coords_spherical["r"][..., None] new_phase = self.propagation_factor(dist=r, k=k, is_2d_simulation=self.is_2d_simulation) # net phase phase = new_phase / old_phase # compute updated fields and their coordinates return self.make_renormalized_data(phase, proj_distance)
[docs] class FieldProjectionKSpaceData(AbstractFieldProjectionData): """Data associated with a :class:`.FieldProjectionKSpaceMonitor`: components of projected fields. Example ------- >>> from tidy3d import FieldProjectionKSpaceDataArray >>> f = np.linspace(1e14, 2e14, 10) >>> ux = np.linspace(0, 0.4, 10) >>> uy = np.linspace(0, 0.6, 20) >>> r = np.atleast_1d(5) >>> coords = dict(ux=ux, uy=uy, r=r, f=f) >>> values = (1+1j) * np.random.random((len(ux), len(uy), len(r), len(f))) >>> scalar_field = FieldProjectionKSpaceDataArray(values, coords=coords) >>> monitor = FieldProjectionKSpaceMonitor( ... center=(1,2,3), size=(2,2,2), freqs=f, name='n2f_monitor', ux=ux, uy=uy, proj_axis=2 ... ) >>> data = FieldProjectionKSpaceData( ... monitor=monitor, Er=scalar_field, Etheta=scalar_field, Ephi=scalar_field, ... Hr=scalar_field, Htheta=scalar_field, Hphi=scalar_field, ... projection_surfaces=monitor.projection_surfaces, ... ) """ monitor: FieldProjectionKSpaceMonitor = pd.Field( ..., title="Projection monitor", description="Field projection monitor with a projection grid defined in k-space.", ) projection_surfaces: Tuple[FieldProjectionSurface, ...] = pd.Field( ..., title="Projection surfaces", description="Surfaces of the monitor where near fields were recorded for projection", ) Er: FieldProjectionKSpaceDataArray = pd.Field( ..., title="Er", description="Spatial distribution of r-component of the electric field.", ) Etheta: FieldProjectionKSpaceDataArray = pd.Field( ..., title="Etheta", description="Spatial distribution of the theta-component of the electric field.", ) Ephi: FieldProjectionKSpaceDataArray = pd.Field( ..., title="Ephi", description="Spatial distribution of phi-component of the electric field.", ) Hr: FieldProjectionKSpaceDataArray = pd.Field( ..., title="Hr", description="Spatial distribution of r-component of the magnetic field.", ) Htheta: FieldProjectionKSpaceDataArray = pd.Field( ..., title="Htheta", description="Spatial distribution of theta-component of the magnetic field.", ) Hphi: FieldProjectionKSpaceDataArray = pd.Field( ..., title="Hphi", description="Spatial distribution of phi-component of the magnetic field.", ) @property def ux(self) -> np.ndarray: """Reciprocal X positions.""" return self.Etheta.ux.values @property def uy(self) -> np.ndarray: """Reciprocal Y positions.""" return @property def r(self) -> np.ndarray: """Radial distance.""" return self.Etheta.r.values
[docs] def renormalize_fields(self, proj_distance: float) -> FieldProjectionKSpaceData: """Return a :class:`.FieldProjectionKSpaceData` with fields re-normalized to a new projection distance, by applying a phase factor based on ``proj_distance``. Parameters ---------- proj_distance : float = None (micron) new radial distance relative to the monitor's local origin. Returns ------- :class:`.FieldProjectionKSpaceData` Copy of this :class:`.FieldProjectionKSpaceData` with fields re-projected to ``proj_distance``. """ if self.monitor and not self.monitor.far_field_approx: raise DataError( "Fields projected without invoking the far field approximation " "cannot be re-projected to a new distance." ) # the phase factor associated with the old distance must be removed r = self.coords_spherical["r"][..., None] old_phase = self.propagation_factor( dist=r, k=self.k[None, None, None, :], is_2d_simulation=self.is_2d_simulation ) # the phase factor associated with the new distance must be applied new_phase = self.propagation_factor( dist=proj_distance, k=self.k, is_2d_simulation=self.is_2d_simulation ) # net phase phase = new_phase[None, None, None, :] / old_phase # compute updated fields and their coordinates return self.make_renormalized_data(phase, proj_distance)
[docs] class DiffractionData(AbstractFieldProjectionData): """Data for a :class:`.DiffractionMonitor`: complex components of diffracted far fields. Note ---- The diffraction data are separated into S and P polarizations. At normal incidence when S and P are undefined, P(S) corresponds to ``Ey``(``Ez``) polarization for monitor normal to x, P(S) corresponds to ``Ex``(``Ez``) polarization for monitor normal to y, and P(S) corresponds to ``Ex``(``Ey``) polarization for monitor normal to z. Example ------- >>> from tidy3d import DiffractionDataArray >>> f = np.linspace(1e14, 2e14, 10) >>> orders_x = list(range(-4, 5)) >>> orders_y = list(range(-6, 7)) >>> pol = ["s", "p"] >>> coords = dict(orders_x=orders_x, orders_y=orders_y, f=f) >>> values = (1+1j) * np.random.random((len(orders_x), len(orders_y), len(f))) >>> field = DiffractionDataArray(values, coords=coords) >>> monitor = DiffractionMonitor( ... center=(1,2,3), size=(np.inf,np.inf,0), freqs=f, name='diffraction' ... ) >>> data = DiffractionData( ... monitor=monitor, sim_size=[1,1], bloch_vecs=[1,2], ... Etheta=field, Ephi=field, Er=field, ... Htheta=field, Hphi=field, Hr=field, ... ) """ monitor: DiffractionMonitor = pd.Field( ..., title="Monitor", description="Diffraction monitor associated with the data." ) Er: DiffractionDataArray = pd.Field( ..., title="Er", description="Spatial distribution of r-component of the electric field.", ) Etheta: DiffractionDataArray = pd.Field( ..., title="Etheta", description="Spatial distribution of the theta-component of the electric field.", ) Ephi: DiffractionDataArray = pd.Field( ..., title="Ephi", description="Spatial distribution of phi-component of the electric field.", ) Hr: DiffractionDataArray = pd.Field( ..., title="Hr", description="Spatial distribution of r-component of the magnetic field.", ) Htheta: DiffractionDataArray = pd.Field( ..., title="Htheta", description="Spatial distribution of theta-component of the magnetic field.", ) Hphi: DiffractionDataArray = pd.Field( ..., title="Hphi", description="Spatial distribution of phi-component of the magnetic field.", ) sim_size: Tuple[float, float] = pd.Field( ..., title="Domain size", description="Size of the near field in the local x and y directions.", units=MICROMETER, ) bloch_vecs: Tuple[float, float] = pd.Field( ..., title="Bloch vectors", description="Bloch vectors along the local x and y directions in units of " "``2 * pi / (simulation size along the respective dimension)``.", )
[docs] @staticmethod def shifted_orders(orders: Tuple[int, ...], bloch_vec: float) -> np.ndarray: """Diffraction orders shifted by the Bloch vector.""" return bloch_vec + np.atleast_1d(orders)
[docs] @staticmethod def reciprocal_coords( orders: np.ndarray, size: float, bloch_vec: float, f: float, medium: MediumType ) -> np.ndarray: """Get the normalized "u" reciprocal coords for a vector of orders, size, and bloch vec.""" if size == 0: return np.atleast_2d(0) epsilon = medium.eps_model(f) bloch_array = DiffractionData.shifted_orders(orders, bloch_vec) return bloch_array[:, None] / size * C_0 / f / np.real(np.sqrt(epsilon))
[docs] @staticmethod def compute_angles( reciprocal_vectors: Tuple[np.ndarray, np.ndarray], ) -> Tuple[np.ndarray, np.ndarray]: """Compute the polar and azimuth angles associated with the given reciprocal vectors.""" # some wave number pairs are outside the light cone, leading to warnings from numpy.arcsin with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="invalid value encountered in arcsin", category=RuntimeWarning ) ux, uy = reciprocal_vectors thetas, phis = DiffractionMonitor.kspace_2_sph(ux[:, None, :], uy[None, :, :], axis=2) return (thetas, phis)
@property def coords_spherical(self) -> Dict[str, np.ndarray]: """Coordinates grid for the fields in the spherical system.""" theta, phi = self.angles return {"r": None, "theta": theta, "phi": phi} @property def orders_x(self) -> np.ndarray: """Allowed orders along x.""" return np.atleast_1d(np.array(self.Etheta.coords["orders_x"])) @property def orders_y(self) -> np.ndarray: """Allowed orders along y.""" return np.atleast_1d(np.array(self.Etheta.coords["orders_y"])) @property def reciprocal_vectors(self) -> Tuple[np.ndarray, np.ndarray]: """Get the normalized "ux" and "uy" reciprocal vectors.""" return (self.ux, @property def ux(self) -> np.ndarray: """Normalized wave vector along x relative to ``local_origin`` and oriented with respect to ``monitor.normal_dir``, normalized by the wave number in the projection medium.""" return self.reciprocal_coords( orders=self.orders_x, size=self.sim_size[0], bloch_vec=self.bloch_vecs[0], f=self.f, medium=self.medium, ) @property def uy(self) -> np.ndarray: """Normalized wave vector along y relative to ``local_origin`` and oriented with respect to ``monitor.normal_dir``, normalized by the wave number in the projection medium.""" return self.reciprocal_coords( orders=self.orders_y, size=self.sim_size[1], bloch_vec=self.bloch_vecs[1], f=self.f, medium=self.medium, ) @property def angles(self) -> Tuple[xr.DataArray]: """The (theta, phi) angles corresponding to each allowed pair of diffraction orders storeds as data arrays. Disallowed angles are set to ``np.nan``. """ thetas, phis = self.compute_angles(self.reciprocal_vectors) theta_data = xr.DataArray(thetas, coords=self.coords) phi_data = xr.DataArray(phis, coords=self.coords) return theta_data, phi_data @property def amps(self) -> xr.DataArray: """Complex power amplitude in each order for 's' and 'p' polarizations, normalized so that the power carried by the wave of that order and polarization equals ``abs(amps)^2``. """ cos_theta = np.cos(np.nan_to_num(self.angles[0])) # will set amplitudes to 0 for glancing or negative angles cos_theta[cos_theta <= 0] = np.inf norm = 1.0 / np.sqrt(2.0 * self.eta) / np.sqrt(cos_theta) amp_theta = self.Etheta.values * norm amp_phi = self.Ephi.values * norm # stack the amplitudes in s- and p-components along a new polarization axis coords = {} coords["orders_x"] = np.atleast_1d(self.orders_x) coords["orders_y"] = np.atleast_1d(self.orders_y) coords["f"] = np.atleast_1d(self.f) coords["polarization"] = ["s", "p"] return xr.DataArray(np.stack([amp_phi, amp_theta], axis=3), coords=coords) @property def power(self) -> xr.DataArray: """Total power in each order, summed over both polarizations.""" return (np.abs(self.amps) ** 2).sum(dim="polarization") @property def fields_spherical(self) -> xr.Dataset: """Get all field components in spherical coordinates relative to the monitor's local origin for all allowed diffraction orders and frequencies specified in the :class:`DiffractionMonitor`. Returns ------- ``xarray.Dataset`` xarray dataset containing (``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``) in spherical coordinates. """ fields = [field.values for field in self.field_components.values()] keys = ["Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi"] return self._make_dataset(fields, keys) @property def fields_cartesian(self) -> xr.Dataset: """Get all field components in Cartesian coordinates relative to the monitor's local origin for all allowed diffraction orders and frequencies specified in the :class:`DiffractionMonitor`. Returns ------- ``xarray.Dataset`` xarray dataset containing (``Ex``, ``Ey``, ``Ez``, ``Hx``, ``Hy``, ``Hz``) in Cartesian coordinates. """ theta, phi = self.angles theta = theta.values phi = phi.values e_x, e_y, e_z = self.monitor.sph_2_car_field( 0, self.Etheta.values, self.Ephi.values, theta, phi ) h_x, h_y, h_z = self.monitor.sph_2_car_field( 0, self.Htheta.values, self.Hphi.values, theta, phi ) e_x, e_y, e_z, h_x, h_y, h_z = ( np.nan_to_num(fld) for fld in [e_x, e_y, e_z, h_x, h_y, h_z] ) fields = [e_x, e_y, e_z, h_x, h_y, h_z] keys = ["Ex", "Ey", "Ez", "Hx", "Hy", "Hz"] return self._make_dataset(fields, keys) def _make_dataset(self, fields: Tuple[np.ndarray, ...], keys: Tuple[str, ...]) -> xr.Dataset: """Make an xr.Dataset for fields with given field names.""" data_arrays = [] for field in fields: data_arrays.append(xr.DataArray(data=field, coords=self.coords, dims=self.dims)) return xr.Dataset(dict(zip(keys, data_arrays))) """ Autograd code """
[docs] def make_adjoint_sources(self, dataset_names: list[str], fwidth: float) -> list[PlaneWave]: """Get all adjoint sources for the ``DiffractionMonitor.amps``.""" # NOTE: everything just goes through `.amps`, any post-processing is encoded in E-fields return self.make_adjoint_sources_amps(fwidth=fwidth)
[docs] def make_adjoint_sources_amps(self, fwidth: float) -> list[PlaneWave]: """Make adjoint sources for outputs that depend on DiffractionData.`amps`.""" amps = self.amps coords = amps.coords adjoint_sources = [] # TODO: speed up with ufunc? # loop over all coordinates in the diffraction amplitudes for freq in coords["f"]: for pol in coords["polarization"]: for order_x in coords["orders_x"]: for order_y in coords["orders_y"]: amp_single = amps.sel( f=freq, polarization=pol, orders_x=order_x, orders_y=order_y, ) # ignore any amplitudes of 0.0 or nan amp_complex = self.get_amplitude(amp_single) if (amp_complex == 0.0) or np.isnan(amp_complex): continue # compute a plane wave for this amplitude (if propagating / not None) adjoint_source = self.adjoint_source_amp(amp=amp_single, fwidth=fwidth) if adjoint_source is not None: adjoint_sources.append(adjoint_source) return adjoint_sources
[docs] def adjoint_source_amp(self, amp: DataArray, fwidth: float) -> PlaneWave: """Generate an adjoint ``PlaneWave`` for a single amplitude.""" monitor = self.monitor # grab the coordinates coords = amp.coords freq0 = coords["f"] pol = coords["polarization"] order_x = coords["orders_x"] order_y = coords["orders_y"] # compute the angle corresponding to this amplitude theta_data, phi_data = self.angles angle_sel_kwargs = dict(orders_x=int(order_x), orders_y=int(order_y), f=float(freq0)) angle_theta = float(theta_data.sel(**angle_sel_kwargs)) angle_phi = float(phi_data.sel(**angle_sel_kwargs)) # if the angle is nan, this amplitude is set to 0 in the fwd pass, so should skip adj if np.isnan(angle_theta): return None # get the polarization angle from the data pol_str = str(pol.values) if pol_str not in ("p", "s"): raise ValueError(f"Something went wrong, given pol='{pol_str}' in adjoint source.") pol_angle = 0.0 if pol_str == "p" else np.pi / 2 # compute the source amplitude amp_complex = self.get_amplitude(amp) k0 = 2 * np.pi * freq0 / C_0 bck_eps = self.medium.eps_model(freq0) grad_const = 0.5 * k0 / np.sqrt(bck_eps) * np.cos(angle_theta) src_amp = grad_const * amp_complex # construct plane wave source adj_src = PlaneWave( size=self.monitor.size,, source_time=GaussianPulse( amplitude=abs(src_amp), phase=np.angle(src_amp), freq0=freq0, fwidth=fwidth, ), direction=self.flip_direction(monitor.normal_dir), angle_theta=angle_theta, angle_phi=angle_phi, pol_angle=pol_angle, ) return adj_src
MonitorDataTypes = ( FieldData, FieldTimeData, PermittivityData, ModeSolverData, ModeData, FluxData, FluxTimeData, FieldProjectionKSpaceData, FieldProjectionCartesianData, FieldProjectionAngleData, DiffractionData, ) MonitorDataType = Union[MonitorDataTypes]