Source code for tidy3d.components.field_projection

"""Near field to far field transformation plugin
"""
from __future__ import annotations
from typing import Dict, Tuple, Union, List
import numpy as np
import xarray as xr
import pydantic.v1 as pydantic

from rich.progress import track

from .data.data_array import FieldProjectionAngleDataArray, FieldProjectionCartesianDataArray
from .data.data_array import FieldProjectionKSpaceDataArray
from .data.monitor_data import FieldData
from .data.monitor_data import AbstractFieldProjectionData, FieldProjectionAngleData
from .data.monitor_data import FieldProjectionCartesianData, FieldProjectionKSpaceData
from .data.sim_data import SimulationData
from .monitor import FieldProjectionSurface
from .monitor import FieldMonitor, AbstractFieldProjectionMonitor, FieldProjectionAngleMonitor
from .monitor import FieldProjectionCartesianMonitor, FieldProjectionKSpaceMonitor
from .types import Direction, Coordinate, ArrayComplex4D
from .medium import MediumType
from .base import Tidy3dBaseModel, cached_property, skip_if_fields_missing
from ..exceptions import SetupError
from ..constants import C_0, MICROMETER, ETA_0, EPSILON_0, MU_0
from ..log import get_logging_console

# Default number of points per wavelength in the background medium to use for resampling fields.
PTS_PER_WVL = 10

# Numpy float array and related array types

ArrayLikeN2F = Union[float, Tuple[float, ...], ArrayComplex4D]


[docs] class FieldProjector(Tidy3dBaseModel): """ Projection of near fields to points on a given observation grid. .. TODO make images to illustrate this See Also -------- :class:`FieldProjectionAngleMonitor :class:`Monitor` that samples electromagnetic near fields in the frequency domain and projects them at given observation angles.` **Notebooks**: * `Performing near field to far field projections <../../notebooks/FieldProjections.html>`_ """ sim_data: SimulationData = pydantic.Field( ..., title="Simulation data", description="Container for simulation data containing the near field monitors.", ) surfaces: Tuple[FieldProjectionSurface, ...] = pydantic.Field( ..., title="Surface monitor with direction", description="Tuple of each :class:`.FieldProjectionSurface` to use as source of " "near field.", ) pts_per_wavelength: Union[int, type(None)] = pydantic.Field( PTS_PER_WVL, title="Points per wavelength", description="Number of points per wavelength in the background medium with which " "to discretize the surface monitors for the projection. If ``None``, fields will " "will not resampled, but will still be colocated.", ) origin: Coordinate = pydantic.Field( None, title="Local origin", description="Local origin used for defining observation points. If ``None``, uses the " "average of the centers of all surface monitors.", units=MICROMETER, ) currents: Dict[str, xr.Dataset] = pydantic.Field( None, title="Surface current densities", description="Dictionary mapping monitor name to an ``xarray.Dataset`` storing the " "surface current densities.", )
[docs] @pydantic.validator("origin", always=True) @skip_if_fields_missing(["surfaces"]) def set_origin(cls, val, values): """Sets .origin as the average of centers of all surface monitors if not provided.""" if val is None: surfaces = values.get("surfaces") val = np.array([surface.monitor.center for surface in surfaces]) return tuple(np.mean(val, axis=0)) return val
@cached_property def medium(self) -> MediumType: """Medium into which fields are to be projected.""" sim = self.sim_data.simulation monitor = self.surfaces[0].monitor return sim.monitor_medium(monitor) @cached_property def frequencies(self) -> List[float]: """Return the list of frequencies associated with the field monitors.""" return self.surfaces[0].monitor.freqs
[docs] @classmethod def from_near_field_monitors( cls, sim_data: SimulationData, near_monitors: List[FieldMonitor], normal_dirs: List[Direction], pts_per_wavelength: int = PTS_PER_WVL, origin: Coordinate = None, ): """Constructs :class:`FieldProjection` from a list of surface monitors and their directions. Parameters ---------- sim_data : :class:`.SimulationData` Container for simulation data containing the near field monitors. near_monitors : List[:class:`.FieldMonitor`] Tuple of :class:`.FieldMonitor` objects on which near fields will be sampled. normal_dirs : List[:class:`.Direction`] Tuple containing the :class:`.Direction` of the normal to each surface monitor w.r.t. to the positive x, y or z unit vectors. Must have the same length as monitors. pts_per_wavelength : int = 10 Number of points per wavelength with which to discretize the surface monitors for the projection. If ``None``, fields will not be resampled. origin : :class:`.Coordinate` Local origin used for defining observation points. If ``None``, uses the average of the centers of all surface monitors. """ if len(near_monitors) != len(normal_dirs): raise SetupError( f"Number of monitors ({len(near_monitors)}) does not equal " f"the number of directions ({len(normal_dirs)})." ) surfaces = [ FieldProjectionSurface(monitor=monitor, normal_dir=normal_dir) for monitor, normal_dir in zip(near_monitors, normal_dirs) ] return cls( sim_data=sim_data, surfaces=surfaces, pts_per_wavelength=pts_per_wavelength, origin=origin, )
@cached_property def currents(self): """Sets the surface currents.""" sim_data = self.sim_data surfaces = self.surfaces pts_per_wavelength = self.pts_per_wavelength medium = self.medium surface_currents = {} for surface in surfaces: current_data = self.compute_surface_currents( sim_data, surface, medium, pts_per_wavelength ) # shift source coordinates relative to the local origin for name, origin in zip(["x", "y", "z"], self.origin): current_data[name] = current_data[name] - origin surface_currents[surface.monitor.name] = current_data return surface_currents
[docs] @staticmethod def compute_surface_currents( sim_data: SimulationData, surface: FieldProjectionSurface, medium: MediumType, pts_per_wavelength: int = PTS_PER_WVL, ) -> xr.Dataset: """Returns resampled surface current densities associated with the surface monitor. Parameters ---------- sim_data : :class:`.SimulationData` Container for simulation data containing the near field monitors. surface: :class:`.FieldProjectionSurface` :class:`.FieldProjectionSurface` to use as source of near field. medium : :class:`.MediumType` Background medium through which to project fields. pts_per_wavelength : int = 10 Number of points per wavelength with which to discretize the surface monitors for the projection. If ``None``, fields will not be resampled, but will still be colocated. Returns ------- xarray.Dataset Colocated surface current densities for the given surface. """ monitor_name = surface.monitor.name if monitor_name not in sim_data.monitor_data.keys(): raise SetupError(f"No data for monitor named '{monitor_name}' found in sim_data.") field_data = sim_data[monitor_name] currents = FieldProjector._fields_to_currents(field_data, surface) currents = FieldProjector._resample_surface_currents( currents, sim_data, surface, medium, pts_per_wavelength ) return currents
@staticmethod def _fields_to_currents(field_data: FieldData, surface: FieldProjectionSurface) -> FieldData: """Returns surface current densities associated with a given :class:`.FieldData` object. Parameters ---------- field_data : :class:`.FieldData` Container for field data associated with the given near field surface. surface: :class:`.FieldProjectionSurface` :class:`.FieldProjectionSurface` to use as source of near field. Returns ------- :class:`.FieldData` Surface current densities for the given surface. """ # figure out which field components are tangential or normal to the monitor _, (cmp_1, cmp_2) = surface.monitor.pop_axis(("x", "y", "z"), axis=surface.axis) signs = np.array([-1, 1]) if surface.axis % 2 != 0: signs *= -1 if surface.normal_dir == "-": signs *= -1 E1 = "E" + cmp_1 E2 = "E" + cmp_2 H1 = "H" + cmp_1 H2 = "H" + cmp_2 surface_currents = {} surface_currents[E2] = field_data.field_components[H1] * signs[1] surface_currents[E1] = field_data.field_components[H2] * signs[0] surface_currents[H2] = field_data.field_components[E1] * signs[0] surface_currents[H1] = field_data.field_components[E2] * signs[1] new_monitor = surface.monitor.copy(update=dict(fields=[E1, E2, H1, H2])) return FieldData( monitor=new_monitor, symmetry=field_data.symmetry, symmetry_center=field_data.symmetry_center, grid_expanded=field_data.grid_expanded, **surface_currents, ) @staticmethod def _resample_surface_currents( currents: xr.Dataset, sim_data: SimulationData, surface: FieldProjectionSurface, medium: MediumType, pts_per_wavelength: int = PTS_PER_WVL, ) -> xr.Dataset: """Returns the surface current densities associated with the surface monitor. Parameters ---------- currents : xarray.Dataset Surface currents defined on the original Yee grid. sim_data : :class:`.SimulationData` Container for simulation data containing the near field monitors. surface: :class:`.FieldProjectionSurface` :class:`.FieldProjectionSurface` to use as source of near field. medium : :class:`.MediumType` Background medium through which to project fields. pts_per_wavelength : int = 10 Number of points per wavelength with which to discretize the surface monitors for the projection. If ``None``, fields will not be resampled, but will still be colocated. Returns ------- xarray.Dataset Colocated surface current densities for the given surface. """ # colocate surface currents on a regular grid of points on the monitor based on wavelength colocation_points = [None] * 3 colocation_points[surface.axis] = surface.monitor.center[surface.axis] # use the highest frequency associated with the monitor to resample the surface currents frequency = max(surface.monitor.freqs) eps_complex = medium.eps_model(frequency) index_n, _ = medium.eps_complex_to_nk(eps_complex) wavelength = C_0 / frequency / index_n _, idx_uv = surface.monitor.pop_axis((0, 1, 2), axis=surface.axis) for idx in idx_uv: # pick sample points on the monitor and handle the possibility of an "infinite" monitor start = np.maximum( surface.monitor.center[idx] - surface.monitor.size[idx] / 2.0, sim_data.simulation.center[idx] - sim_data.simulation.size[idx] / 2.0, ) stop = np.minimum( surface.monitor.center[idx] + surface.monitor.size[idx] / 2.0, sim_data.simulation.center[idx] + sim_data.simulation.size[idx] / 2.0, ) if pts_per_wavelength is None: points = sim_data.simulation.grid.boundaries.to_list[idx] points[np.argwhere(points < start)] = start points[np.argwhere(points > stop)] = stop colocation_points[idx] = np.unique(points) else: size = stop - start num_pts = int(np.ceil(pts_per_wavelength * size / wavelength)) points = np.linspace(start, stop, num_pts) colocation_points[idx] = points for idx, points in enumerate(colocation_points): if np.array(points).size == 1: colocation_points[idx] = None currents = currents.colocate(*colocation_points) return currents
[docs] def integrate_2d( self, function: np.ndarray, phase: np.ndarray, pts_u: np.ndarray, pts_v: np.ndarray, ): """Trapezoidal integration in two dimensions.""" return np.trapz(np.trapz(np.squeeze(function) * phase, pts_u, axis=0), pts_v, axis=0)
def _far_fields_for_surface( self, frequency: float, theta: ArrayLikeN2F, phi: ArrayLikeN2F, surface: FieldProjectionSurface, currents: xr.Dataset, medium: MediumType, ): """Compute far fields at an angle in spherical coordinates for a given set of surface currents and observation angles. Parameters ---------- frequency : float Frequency to select from each :class:`.FieldMonitor` to use for projection. Must be a frequency stored in each :class:`FieldMonitor`. theta : Union[float, Tuple[float, ...], np.ndarray] Polar angles (rad) downward from x=y=0 line relative to the local origin. phi : Union[float, Tuple[float, ...], np.ndarray] Azimuthal (rad) angles from y=z=0 line relative to the local origin. surface: :class:`FieldProjectionSurface` :class:`FieldProjectionSurface` object to use as source of near field. currents : xarray.Dataset xarray Dataset containing surface currents associated with the surface monitor. medium : :class:`.MediumType` Background medium through which to project fields. Returns ------- tuple(numpy.ndarray[float], ...) ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi`` for the given surface. """ pts = [currents[name].values for name in ["x", "y", "z"]] try: currents_f = currents.sel(f=frequency) except Exception as e: raise SetupError( f"Frequency {frequency} not found in fields for monitor '{surface.monitor.name}'." ) from e idx_w, idx_uv = surface.monitor.pop_axis((0, 1, 2), axis=surface.axis) _, source_names = surface.monitor.pop_axis(("x", "y", "z"), axis=surface.axis) idx_u, idx_v = idx_uv cmp_1, cmp_2 = source_names theta = np.atleast_1d(theta) phi = np.atleast_1d(phi) sin_theta = np.sin(theta) cos_theta = np.cos(theta) sin_phi = np.sin(phi) cos_phi = np.cos(phi) J = np.zeros((3, len(theta), len(phi)), dtype=complex) M = np.zeros_like(J) phase = [None] * 3 propagation_factor = -1j * AbstractFieldProjectionData.wavenumber( medium=medium, frequency=frequency ) def integrate_for_one_theta(i_th: int): """Perform integration for a given theta angle index""" for j_ph in np.arange(len(phi)): phase[0] = np.exp(propagation_factor * pts[0] * sin_theta[i_th] * cos_phi[j_ph]) phase[1] = np.exp(propagation_factor * pts[1] * sin_theta[i_th] * sin_phi[j_ph]) phase[2] = np.exp(propagation_factor * pts[2] * cos_theta[i_th]) phase_ij = phase[idx_u][:, None] * phase[idx_v][None, :] * phase[idx_w] J[idx_u, i_th, j_ph] = self.integrate_2d( currents_f[f"E{cmp_1}"].values, phase_ij, pts[idx_u], pts[idx_v] ) J[idx_v, i_th, j_ph] = self.integrate_2d( currents_f[f"E{cmp_2}"].values, phase_ij, pts[idx_u], pts[idx_v] ) M[idx_u, i_th, j_ph] = self.integrate_2d( currents_f[f"H{cmp_1}"].values, phase_ij, pts[idx_u], pts[idx_v] ) M[idx_v, i_th, j_ph] = self.integrate_2d( currents_f[f"H{cmp_2}"].values, phase_ij, pts[idx_u], pts[idx_v] ) if len(theta) < 2: integrate_for_one_theta(0) else: for i_th in track( np.arange(len(theta)), description=f"Processing surface monitor '{surface.monitor.name}'...", console=get_logging_console(), ): integrate_for_one_theta(i_th) cos_th_cos_phi = cos_theta[:, None] * cos_phi[None, :] cos_th_sin_phi = cos_theta[:, None] * sin_phi[None, :] # Ntheta (8.33a) Ntheta = J[0] * cos_th_cos_phi + J[1] * cos_th_sin_phi - J[2] * sin_theta[:, None] # Nphi (8.33b) Nphi = -J[0] * sin_phi[None, :] + J[1] * cos_phi[None, :] # Ltheta (8.34a) Ltheta = M[0] * cos_th_cos_phi + M[1] * cos_th_sin_phi - M[2] * sin_theta[:, None] # Lphi (8.34b) Lphi = -M[0] * sin_phi[None, :] + M[1] * cos_phi[None, :] eta = ETA_0 / np.sqrt(medium.eps_model(frequency)) Etheta = -(Lphi + eta * Ntheta) Ephi = Ltheta - eta * Nphi Er = np.zeros_like(Ephi) Htheta = -Ephi / eta Hphi = Etheta / eta Hr = np.zeros_like(Hphi) return Er, Etheta, Ephi, Hr, Htheta, Hphi
[docs] @staticmethod def apply_window_to_currents( proj_monitor: AbstractFieldProjectionMonitor, currents: xr.Dataset ) -> xr.Dataset: """Apply windowing function to the surface currents.""" if proj_monitor.size.count(0.0) == 0: return currents if proj_monitor.window_size == (0, 0): return currents pts = [currents[name].values for name in ["x", "y", "z"]] custom_bounds = [ [pts[i][0] for i in range(3)], [pts[i][-1] for i in range(3)], ] window_size, window_minus, window_plus = proj_monitor.window_parameters( custom_bounds=custom_bounds ) new_currents = currents.copy(deep=True) for dim, (dim_name, points) in enumerate(zip("xyz", pts)): window_fn = proj_monitor.window_function( points=points, window_size=window_size, window_minus=window_minus, window_plus=window_plus, dim=dim, ) window_data = xr.DataArray( window_fn, dims=[dim_name], coords=[points], ) new_currents *= window_data return new_currents
[docs] def project_fields( self, proj_monitor: AbstractFieldProjectionMonitor ) -> AbstractFieldProjectionData: """Compute projected fields. Parameters ---------- proj_monitor : :class:`.AbstractFieldProjectionMonitor` Instance of :class:`.AbstractFieldProjectionMonitor` defining the projection observation grid. Returns ------- :class:`.AbstractFieldProjectionData` Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``. """ if isinstance(proj_monitor, FieldProjectionAngleMonitor): return self._project_fields_angular(proj_monitor) if isinstance(proj_monitor, FieldProjectionCartesianMonitor): return self._project_fields_cartesian(proj_monitor) return self._project_fields_kspace(proj_monitor)
def _project_fields_angular( self, monitor: FieldProjectionAngleMonitor ) -> FieldProjectionAngleData: """Compute projected fields on an angle-based grid in spherical coordinates. Parameters ---------- monitor : :class:`.FieldProjectionAngleMonitor` Instance of :class:`.FieldProjectionAngleMonitor` defining the projection observation grid. Returns ------- :class:.`FieldProjectionAngleData` Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``. """ freqs = np.atleast_1d(self.frequencies) theta = np.atleast_1d(monitor.theta) phi = np.atleast_1d(monitor.phi) # compute projected fields for the dataset associated with each monitor field_names = ("Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi") fields = [ np.zeros((1, len(theta), len(phi), len(freqs)), dtype=complex) for _ in field_names ] medium = monitor.medium if monitor.medium else self.medium k = AbstractFieldProjectionData.wavenumber(medium=medium, frequency=freqs) phase = np.atleast_1d( AbstractFieldProjectionData.propagation_phase(dist=monitor.proj_distance, k=k) ) for surface in self.surfaces: # apply windowing to currents currents = self.apply_window_to_currents(monitor, self.currents[surface.monitor.name]) if monitor.far_field_approx: for idx_f, frequency in enumerate(freqs): _fields = self._far_fields_for_surface( frequency=frequency, theta=theta, phi=phi, surface=surface, currents=currents, medium=medium, ) for field, _field in zip(fields, _fields): field[..., idx_f] += _field * phase[idx_f] else: iter_coords = [ ([_theta, _phi], [i, j]) for i, _theta in enumerate(theta) for j, _phi in enumerate(phi) ] for (_theta, _phi), (i, j) in track( iter_coords, description=f"Processing surface monitor '{surface.monitor.name}'...", console=get_logging_console(), ): _x, _y, _z = monitor.sph_2_car(monitor.proj_distance, _theta, _phi) _fields = self._fields_for_surface_exact( x=_x, y=_y, z=_z, surface=surface, currents=currents, medium=medium ) for field, _field in zip(fields, _fields): field[0, i, j, :] += _field coords = {"r": np.atleast_1d(monitor.proj_distance), "theta": theta, "phi": phi, "f": freqs} fields = { name: FieldProjectionAngleDataArray(field, coords=coords) for name, field in zip(field_names, fields) } return FieldProjectionAngleData( monitor=monitor, projection_surfaces=self.surfaces, medium=medium, **fields ) def _project_fields_cartesian( self, monitor: FieldProjectionCartesianMonitor ) -> FieldProjectionCartesianData: """Compute projected fields on a Cartesian grid in spherical coordinates. Parameters ---------- monitor : :class:`.FieldProjectionCartesianMonitor` Instance of :class:`.FieldProjectionCartesianMonitor` defining the projection observation grid. Returns ------- :class:.`FieldProjectionCartesianData` Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``. """ freqs = np.atleast_1d(self.frequencies) x, y, z = monitor.unpop_axis( monitor.proj_distance, (monitor.x, monitor.y), axis=monitor.proj_axis ) x, y, z = list(map(np.atleast_1d, [x, y, z])) # compute projected fields for the dataset associated with each monitor field_names = ("Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi") fields = [ np.zeros((len(x), len(y), len(z), len(freqs)), dtype=complex) for _ in field_names ] medium = monitor.medium if monitor.medium else self.medium wavenumber = AbstractFieldProjectionData.wavenumber(medium=medium, frequency=freqs) # Zip together all combinations of observation points for better progress tracking iter_coords = [ ([_x, _y, _z], [i, j, k]) for i, _x in enumerate(x) for j, _y in enumerate(y) for k, _z in enumerate(z) ] for (_x, _y, _z), (i, j, k) in track( iter_coords, description="Computing projected fields", console=get_logging_console() ): r, theta, phi = monitor.car_2_sph(_x, _y, _z) phase = np.atleast_1d( AbstractFieldProjectionData.propagation_phase(dist=r, k=wavenumber) ) for surface in self.surfaces: # apply windowing to currents currents = self.apply_window_to_currents( monitor, self.currents[surface.monitor.name] ) if monitor.far_field_approx: for idx_f, frequency in enumerate(freqs): _fields = self._far_fields_for_surface( frequency=frequency, theta=theta, phi=phi, surface=surface, currents=currents, medium=medium, ) for field, _field in zip(fields, _fields): field[i, j, k, idx_f] += _field * phase[idx_f] else: _fields = self._fields_for_surface_exact( x=_x, y=_y, z=_z, surface=surface, currents=currents, medium=medium ) for field, _field in zip(fields, _fields): field[i, j, k, :] += _field coords = {"x": x, "y": y, "z": z, "f": freqs} fields = { name: FieldProjectionCartesianDataArray(field, coords=coords) for name, field in zip(field_names, fields) } return FieldProjectionCartesianData( monitor=monitor, projection_surfaces=self.surfaces, medium=medium, **fields ) def _project_fields_kspace( self, monitor: FieldProjectionKSpaceMonitor ) -> FieldProjectionKSpaceData: """Compute projected fields on a k-space grid in spherical coordinates. Parameters ---------- monitor : :class:`.FieldProjectionKSpaceMonitor` Instance of :class:`.FieldProjectionKSpaceMonitor` defining the projection observation grid. Returns ------- :class:.`FieldProjectionKSpaceData` Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``. """ freqs = np.atleast_1d(self.frequencies) ux = np.atleast_1d(monitor.ux) uy = np.atleast_1d(monitor.uy) # compute projected fields for the dataset associated with each monitor field_names = ("Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi") fields = [np.zeros((len(ux), len(uy), 1, len(freqs)), dtype=complex) for _ in field_names] medium = monitor.medium if monitor.medium else self.medium k = AbstractFieldProjectionData.wavenumber(medium=medium, frequency=freqs) phase = np.atleast_1d( AbstractFieldProjectionData.propagation_phase(dist=monitor.proj_distance, k=k) ) # Zip together all combinations of observation points for better progress tracking iter_coords = [([_ux, _uy], [i, j]) for i, _ux in enumerate(ux) for j, _uy in enumerate(uy)] for (_ux, _uy), (i, j) in track( iter_coords, description="Computing projected fields", console=get_logging_console() ): theta, phi = monitor.kspace_2_sph(_ux, _uy, monitor.proj_axis) for surface in self.surfaces: # apply windowing to currents currents = self.apply_window_to_currents( monitor, self.currents[surface.monitor.name] ) if monitor.far_field_approx: for idx_f, frequency in enumerate(freqs): _fields = self._far_fields_for_surface( frequency=frequency, theta=theta, phi=phi, surface=surface, currents=currents, medium=medium, ) for field, _field in zip(fields, _fields): field[i, j, 0, idx_f] += _field * phase[idx_f] else: _x, _y, _z = monitor.sph_2_car(monitor.proj_distance, theta, phi) _fields = self._fields_for_surface_exact( x=_x, y=_y, z=_z, surface=surface, currents=currents, medium=medium ) for field, _field in zip(fields, _fields): field[i, j, 0, :] += _field coords = { "ux": np.array(monitor.ux), "uy": np.array(monitor.uy), "r": np.atleast_1d(monitor.proj_distance), "f": freqs, } fields = { name: FieldProjectionKSpaceDataArray(field, coords=coords) for name, field in zip(field_names, fields) } return FieldProjectionKSpaceData( monitor=monitor, projection_surfaces=self.surfaces, medium=medium, **fields ) """Exact projections""" def _fields_for_surface_exact( self, x: float, y: float, z: float, surface: FieldProjectionSurface, currents: xr.Dataset, medium: MediumType, ): """Compute projected fields in spherical coordinates at a given projection point on a Cartesian grid for a given set of surface currents using the exact homogeneous medium Green's function without geometric approximations. Parameters ---------- x : float Observation point x-coordinate (microns) relative to the local origin. y : float Observation point y-coordinate (microns) relative to the local origin. z : float Observation point z-coordinate (microns) relative to the local origin. surface: :class:`FieldProjectionSurface` :class:`FieldProjectionSurface` object to use as source of near field. currents : xarray.Dataset xarray Dataset containing surface currents associated with the surface monitor. medium : :class:`.MediumType` Background medium through which to project fields. Returns ------- tuple(np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray) ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi`` projected fields for each frequency. """ freqs = np.array(self.frequencies) i_omega = 1j * 2.0 * np.pi * freqs[None, None, None, :] wavenumber = AbstractFieldProjectionData.wavenumber(frequency=freqs, medium=medium) wavenumber = wavenumber[None, None, None, :] # add space dimensions eps_complex = medium.eps_model(frequency=freqs) epsilon = EPSILON_0 * eps_complex[None, None, None, :] # source points pts = [currents[name].values for name in ["x", "y", "z"]] # transform the coordinate system so that the origin is at the source point # then the observation points in the new system are: x_new, y_new, z_new = (pt_obs - pt_src for pt_src, pt_obs in zip(pts, [x, y, z])) # tangential source components to use idx_w, idx_uv = surface.monitor.pop_axis((0, 1, 2), axis=surface.axis) _, source_names = surface.monitor.pop_axis(("x", "y", "z"), axis=surface.axis) idx_u, idx_v = idx_uv cmp_1, cmp_2 = source_names # set the surface current density Cartesian components J = [np.atleast_1d(0)] * 3 M = [np.atleast_1d(0)] * 3 J[idx_u] = currents[f"E{cmp_1}"].values J[idx_v] = currents[f"E{cmp_2}"].values J[idx_w] = np.zeros_like(J[idx_u]) M[idx_u] = currents[f"H{cmp_1}"].values M[idx_v] = currents[f"H{cmp_2}"].values M[idx_w] = np.zeros_like(M[idx_u]) # observation point in the new spherical system r, theta_obs, phi_obs = surface.monitor.car_2_sph( x_new[:, None, None, None], y_new[None, :, None, None], z_new[None, None, :, None] ) # angle terms sin_theta = np.sin(theta_obs) cos_theta = np.cos(theta_obs) sin_phi = np.sin(phi_obs) cos_phi = np.cos(phi_obs) # Green's function and terms related to its derivatives ikr = 1j * wavenumber * r G = np.exp(ikr) / (4.0 * np.pi * r) dG_dr = G * (ikr - 1.0) / r d2G_dr2 = dG_dr * (ikr - 1.0) / r + G / (r**2) # operations between unit vectors and currents def r_x_current(current: Tuple[np.ndarray, ...]) -> Tuple[np.ndarray, ...]: """Cross product between the r unit vector and the current.""" return [ sin_theta * sin_phi * current[2] - cos_theta * current[1], cos_theta * current[0] - sin_theta * cos_phi * current[2], sin_theta * cos_phi * current[1] - sin_theta * sin_phi * current[0], ] def r_dot_current(current: Tuple[np.ndarray, ...]) -> np.ndarray: """Dot product between the r unit vector and the current.""" return ( sin_theta * cos_phi * current[0] + sin_theta * sin_phi * current[1] + cos_theta * current[2] ) def r_dot_current_dtheta(current: Tuple[np.ndarray, ...]) -> np.ndarray: """Theta derivative of the dot product between the r unit vector and the current.""" return ( cos_theta * cos_phi * current[0] + cos_theta * sin_phi * current[1] - sin_theta * current[2] ) def r_dot_current_dphi_div_sin_theta(current: Tuple[np.ndarray, ...]) -> np.ndarray: """Phi derivative of the dot product between the r unit vector and the current, analytically divided by sin theta.""" return -sin_phi * current[0] + cos_phi * current[1] def grad_Gr_r_dot_current(current: Tuple[np.ndarray, ...]) -> Tuple[np.ndarray, ...]: """Gradient of the product of the gradient of the Green's function and the dot product between the r unit vector and the current.""" temp = [ d2G_dr2 * r_dot_current(current), dG_dr * r_dot_current_dtheta(current) / r, dG_dr * r_dot_current_dphi_div_sin_theta(current) / r, ] # convert to Cartesian coordinates return surface.monitor.sph_2_car_field(temp[0], temp[1], temp[2], theta_obs, phi_obs) def potential_terms(current: Tuple[np.ndarray, ...], const: complex): """Assemble vector potential and its derivatives.""" r_x_c = r_x_current(current) pot = [const * item * G for item in current] curl_pot = [const * item * dG_dr for item in r_x_c] grad_div_pot = grad_Gr_r_dot_current(current) grad_div_pot = [const * item for item in grad_div_pot] return pot, curl_pot, grad_div_pot # magnetic vector potential terms A, curl_A, grad_div_A = potential_terms(J, MU_0) # electric vector potential terms F, curl_F, grad_div_F = potential_terms(M, epsilon) # assemble the electric field components (Taflove 8.24, 8.27) e_x_integrand, e_y_integrand, e_z_integrand = ( i_omega * (a + grad_div_a / (wavenumber**2)) - curl_f / epsilon for a, grad_div_a, curl_f in zip(A, grad_div_A, curl_F) ) # assemble the magnetic field components (Taflove 8.25, 8.28) h_x_integrand, h_y_integrand, h_z_integrand = ( i_omega * (f + grad_div_f / (wavenumber**2)) + curl_a / MU_0 for f, grad_div_f, curl_a in zip(F, grad_div_F, curl_A) ) # integrate over the surface e_x = self.integrate_2d(e_x_integrand, 1.0, pts[idx_u], pts[idx_v]) e_y = self.integrate_2d(e_y_integrand, 1.0, pts[idx_u], pts[idx_v]) e_z = self.integrate_2d(e_z_integrand, 1.0, pts[idx_u], pts[idx_v]) h_x = self.integrate_2d(h_x_integrand, 1.0, pts[idx_u], pts[idx_v]) h_y = self.integrate_2d(h_y_integrand, 1.0, pts[idx_u], pts[idx_v]) h_z = self.integrate_2d(h_z_integrand, 1.0, pts[idx_u], pts[idx_v]) # observation point in the original spherical system _, theta_obs, phi_obs = surface.monitor.car_2_sph(x, y, z) # convert fields to the original spherical system e_r, e_theta, e_phi = surface.monitor.car_2_sph_field(e_x, e_y, e_z, theta_obs, phi_obs) h_r, h_theta, h_phi = surface.monitor.car_2_sph_field(h_x, h_y, h_z, theta_obs, phi_obs) return [e_r, e_theta, e_phi, h_r, h_theta, h_phi]