Source code for tidy3d.components.microwave.path_integrals.integrals.base

"""Base classes for performing path integrals with fields on the Yee grid"""

from __future__ import annotations

from typing import Literal, Union

import numpy as np
import xarray as xr

from tidy3d.components.data.data_array import (
    IntegralResultType,
    ScalarFieldDataArray,
    ScalarFieldTimeDataArray,
    ScalarModeFieldDataArray,
    _make_base_result_data_array,
)
from tidy3d.components.data.monitor_data import FieldData, FieldTimeData, ModeData, ModeSolverData
from tidy3d.components.microwave.path_integrals.specs.base import (
    AxisAlignedPathIntegralSpec,
    Custom2DPathIntegralSpec,
)
from tidy3d.constants import fp_eps
from tidy3d.exceptions import DataError

IntegrableMonitorDataType = Union[FieldData, FieldTimeData, ModeData, ModeSolverData]
EMScalarFieldType = Union[ScalarFieldDataArray, ScalarFieldTimeDataArray, ScalarModeFieldDataArray]
FieldParameter = Literal["E", "H"]


[docs] class AxisAlignedPathIntegral(AxisAlignedPathIntegralSpec): """Class for defining the simplest type of path integral, which is aligned with Cartesian axes. Example ------- >>> path = AxisAlignedPathIntegral( ... center=(0, 0, 1), ... size=(0, 0, 2), ... extrapolate_to_endpoints=True, ... snap_path_to_grid=False, ... ) """
[docs] def compute_integral(self, scalar_field: EMScalarFieldType) -> IntegralResultType: """Computes the defined integral given the input ``scalar_field``.""" if not scalar_field.does_cover(self.bounds, fp_eps, np.finfo(np.float32).smallest_normal): raise DataError("Scalar field does not cover the integration domain.") coord = "xyz"[self.main_axis] scalar_field = self._get_field_along_path(scalar_field) # Get the boundaries min_bound = self.bounds[0][self.main_axis] max_bound = self.bounds[1][self.main_axis] if self.extrapolate_to_endpoints: # Remove field outside the boundaries scalar_field = scalar_field.sel({coord: slice(min_bound, max_bound)}) # Ignore values on the boundary (sel is inclusive) scalar_field = scalar_field.drop_sel({coord: (min_bound, max_bound)}, errors="ignore") coordinates = scalar_field.coords[coord].values else: coordinates = scalar_field.coords[coord].sel({coord: slice(min_bound, max_bound)}) # Integration is along the original coordinates plus ensure that # endpoints corresponding to the precise bounds of the port are included coords_interp = np.array([min_bound]) coords_interp = np.concatenate((coords_interp, coordinates)) coords_interp = np.concatenate((coords_interp, [max_bound])) coords_interp = {coord: coords_interp} # Use extrapolation for the 2 additional endpoints, unless there is only a single sample point method = "linear" if len(coordinates) == 1 and self.extrapolate_to_endpoints: method = "nearest" scalar_field = scalar_field.interp( coords_interp, method=method, kwargs={"fill_value": "extrapolate"} ) result = scalar_field.integrate(coord=coord) return _make_base_result_data_array(result)
def _get_field_along_path(self, scalar_field: EMScalarFieldType) -> EMScalarFieldType: """Returns a selection of the input ``scalar_field`` ready for integration.""" (axis1, axis2) = self.remaining_axes (coord1, coord2) = self.remaining_dims if self.snap_path_to_grid: # Coordinates that are not integrated remaining_coords = { coord1: self.center[axis1], coord2: self.center[axis2], } # Select field nearest to center of integration line scalar_field = scalar_field.sel( remaining_coords, method="nearest", drop=False, ) else: # Try to interpolate unless there is only a single coordinate coord1dict = {coord1: self.center[axis1]} if scalar_field.sizes[coord1] == 1: scalar_field = scalar_field.sel(coord1dict, method="nearest") else: scalar_field = scalar_field.interp( coord1dict, method="linear", kwargs={"bounds_error": True} ) coord2dict = {coord2: self.center[axis2]} if scalar_field.sizes[coord2] == 1: scalar_field = scalar_field.sel(coord2dict, method="nearest") else: scalar_field = scalar_field.interp( coord2dict, method="linear", kwargs={"bounds_error": True} ) # Remove unneeded coordinates scalar_field = scalar_field.reset_coords(drop=True) return scalar_field @staticmethod def _check_monitor_data_supported(em_field: IntegrableMonitorDataType) -> None: """Helper for validating that monitor data is supported.""" if not isinstance(em_field, (FieldData, FieldTimeData, ModeData, ModeSolverData)): supported_types = list(IntegrableMonitorDataType.__args__) raise DataError( f"'em_field' type {type(em_field)} not supported. Supported types are " f"{supported_types}" )
[docs] class Custom2DPathIntegral(Custom2DPathIntegralSpec): """Class for defining a custom path integral defined as a curve on an axis-aligned plane. Notes ----- Given a set of vertices :math:`\\vec{r}_i`, this class approximates path integrals over vector fields of the form :math:`\\int{\\vec{F} \\cdot \\vec{dl}}` as :math:`\\sum_i{\\vec{F}(\\vec{r}_i) \\cdot \\vec{dl}_i}`, where the differential length :math:`\\vec{dl}` is approximated using central differences :math:`\\vec{dl}_i = \\frac{\\vec{r}_{i+1} - \\vec{r}_{i-1}}{2}`. If the path is not closed, forward and backward differences are used at the endpoints. Example ------- >>> import numpy as np >>> vertices = np.array([[0, 0], [1, 0], [1, 1], [0, 1]]) >>> path = Custom2DPathIntegral( ... axis=2, ... position=0.5, ... vertices=vertices, ... ) """
[docs] def compute_integral( self, field: FieldParameter, em_field: IntegrableMonitorDataType ) -> IntegralResultType: """Computes the path integral defined by ``vertices`` given the input ``em_field``. Parameters ---------- field : :class:`.FieldParameter` Can take the value of ``"E"`` or ``"H"``. Determines whether to perform the integral over electric or magnetic field. em_field : :class:`.IntegrableMonitorDataType` The electromagnetic field data that will be used for integrating. Returns ------- :class:`.IntegralResultType` Result of integral over remaining dimensions (frequency, time, mode indices). """ (dim1, dim2, dim3) = self.local_dims h_field_name = f"{field}{dim1}" v_field_name = f"{field}{dim2}" # Validate that fields are present em_field._check_fields_stored([h_field_name, v_field_name]) # Select fields lying on the plane plane_indexer = {dim3: self.position} field1 = em_field.field_components[h_field_name].sel(plane_indexer, method="nearest") field2 = em_field.field_components[v_field_name].sel(plane_indexer, method="nearest") # Although for users we use the convention that an axis is simply `popped` # internally we prefer a right-handed coordinate system where dimensions # keep a proper order. The only change is to swap 'x' and 'z' when the # normal axis is along `y` # Dim 's' represents the parameterization of the line # 't' is likely used for time if self.main_axis == 1: x_path = xr.DataArray(self.vertices[:, 1], dims="s") y_path = xr.DataArray(self.vertices[:, 0], dims="s") else: x_path = xr.DataArray(self.vertices[:, 0], dims="s") y_path = xr.DataArray(self.vertices[:, 1], dims="s") path_indexer = {dim1: x_path, dim2: y_path} field1_interp = field1.interp(path_indexer, method="linear") field2_interp = field2.interp(path_indexer, method="linear") # Determine the differential length elements along the path dl_x = Custom2DPathIntegralSpec._compute_dl_component(x_path, self.is_closed_contour) dl_y = Custom2DPathIntegralSpec._compute_dl_component(y_path, self.is_closed_contour) dl_x = xr.DataArray(dl_x, dims="s") dl_y = xr.DataArray(dl_y, dims="s") # Compute the dot product between differential length element and vector field integrand = field1_interp * dl_x + field2_interp * dl_y # Integrate along the path result = integrand.integrate(coord="s") result = result.reset_coords(drop=True) return _make_base_result_data_array(result)