"""Near field to far field transformation plugin"""
from __future__ import annotations
from typing import Iterable, List, Tuple, Union
import autograd.numpy as anp
import numpy as np
import pydantic.v1 as pydantic
import xarray as xr
from rich.progress import track
from ..constants import C_0, EPSILON_0, ETA_0, MICROMETER, MU_0
from ..exceptions import SetupError
from ..log import get_logging_console
from .autograd.functions import add_at, trapz
from .base import Tidy3dBaseModel, cached_property, skip_if_fields_missing
from .data.data_array import (
FieldProjectionAngleDataArray,
FieldProjectionCartesianDataArray,
FieldProjectionKSpaceDataArray,
)
from .data.monitor_data import (
AbstractFieldProjectionData,
FieldData,
FieldProjectionAngleData,
FieldProjectionCartesianData,
FieldProjectionKSpaceData,
)
from .data.sim_data import SimulationData
from .medium import MediumType
from .monitor import (
AbstractFieldProjectionMonitor,
FieldMonitor,
FieldProjectionAngleMonitor,
FieldProjectionCartesianMonitor,
FieldProjectionKSpaceMonitor,
FieldProjectionSurface,
)
from .types import ArrayComplex4D, Coordinate, Direction
# Default number of points per wavelength in the background medium to use for resampling fields.
PTS_PER_WVL = 10
# Numpy float array and related array types
ArrayLikeN2F = Union[float, Tuple[float, ...], ArrayComplex4D]
[docs]
class FieldProjector(Tidy3dBaseModel):
"""
Projection of near fields to points on a given observation grid.
.. TODO make images to illustrate this
See Also
--------
:class:`FieldProjectionAngleMonitor
:class:`Monitor` that samples electromagnetic near fields in the frequency domain
and projects them at given observation angles.`
**Notebooks**:
* `Performing near field to far field projections <../../notebooks/FieldProjections.html>`_
"""
sim_data: SimulationData = pydantic.Field(
...,
title="Simulation data",
description="Container for simulation data containing the near field monitors.",
)
surfaces: Tuple[FieldProjectionSurface, ...] = pydantic.Field(
...,
title="Surface monitor with direction",
description="Tuple of each :class:`.FieldProjectionSurface` to use as source of "
"near field.",
)
pts_per_wavelength: Union[int, type(None)] = pydantic.Field(
PTS_PER_WVL,
title="Points per wavelength",
description="Number of points per wavelength in the background medium with which "
"to discretize the surface monitors for the projection. If ``None``, fields will "
"will not resampled, but will still be colocated.",
)
origin: Coordinate = pydantic.Field(
None,
title="Local origin",
description="Local origin used for defining observation points. If ``None``, uses the "
"average of the centers of all surface monitors.",
units=MICROMETER,
)
@cached_property
def is_2d_simulation(self) -> bool:
non_zero_dims = sum(1 for size in self.sim_data.simulation.size if size != 0)
return non_zero_dims == 2
[docs]
@pydantic.validator("origin", always=True)
@skip_if_fields_missing(["surfaces"])
def set_origin(cls, val, values):
"""Sets .origin as the average of centers of all surface monitors if not provided."""
if val is None:
surfaces = values.get("surfaces")
val = np.array([surface.monitor.center for surface in surfaces])
return tuple(np.mean(val, axis=0))
return val
@cached_property
def medium(self) -> MediumType:
"""Medium into which fields are to be projected."""
sim = self.sim_data.simulation
monitor = self.surfaces[0].monitor
return sim.monitor_medium(monitor)
@cached_property
def frequencies(self) -> List[float]:
"""Return the list of frequencies associated with the field monitors."""
return self.surfaces[0].monitor.freqs
[docs]
@classmethod
def from_near_field_monitors(
cls,
sim_data: SimulationData,
near_monitors: List[FieldMonitor],
normal_dirs: List[Direction],
pts_per_wavelength: int = PTS_PER_WVL,
origin: Coordinate = None,
):
"""Constructs :class:`FieldProjection` from a list of surface monitors and their directions.
Parameters
----------
sim_data : :class:`.SimulationData`
Container for simulation data containing the near field monitors.
near_monitors : List[:class:`.FieldMonitor`]
Tuple of :class:`.FieldMonitor` objects on which near fields will be sampled.
normal_dirs : List[:class:`.Direction`]
Tuple containing the :class:`.Direction` of the normal to each surface monitor
w.r.t. to the positive x, y or z unit vectors. Must have the same length as monitors.
pts_per_wavelength : int = 10
Number of points per wavelength with which to discretize the
surface monitors for the projection. If ``None``, fields will not be resampled.
origin : :class:`.Coordinate`
Local origin used for defining observation points. If ``None``, uses the
average of the centers of all surface monitors.
"""
if len(near_monitors) != len(normal_dirs):
raise SetupError(
f"Number of monitors ({len(near_monitors)}) does not equal "
f"the number of directions ({len(normal_dirs)})."
)
surfaces = [
FieldProjectionSurface(monitor=monitor, normal_dir=normal_dir)
for monitor, normal_dir in zip(near_monitors, normal_dirs)
]
return cls(
sim_data=sim_data,
surfaces=surfaces,
pts_per_wavelength=pts_per_wavelength,
origin=origin,
)
@cached_property
def currents(self):
"""Sets the surface currents."""
sim_data = self.sim_data
surfaces = self.surfaces
pts_per_wavelength = self.pts_per_wavelength
medium = self.medium
surface_currents = {}
for surface in surfaces:
current_data = self.compute_surface_currents(
sim_data, surface, medium, pts_per_wavelength
)
# shift source coordinates relative to the local origin
for name, origin in zip(["x", "y", "z"], self.origin):
current_data[name] = current_data[name] - origin
surface_currents[surface.monitor.name] = current_data
return surface_currents
[docs]
@staticmethod
def compute_surface_currents(
sim_data: SimulationData,
surface: FieldProjectionSurface,
medium: MediumType,
pts_per_wavelength: int = PTS_PER_WVL,
) -> xr.Dataset:
"""Returns resampled surface current densities associated with the surface monitor.
Parameters
----------
sim_data : :class:`.SimulationData`
Container for simulation data containing the near field monitors.
surface: :class:`.FieldProjectionSurface`
:class:`.FieldProjectionSurface` to use as source of near field.
medium : :class:`.MediumType`
Background medium through which to project fields.
pts_per_wavelength : int = 10
Number of points per wavelength with which to discretize the
surface monitors for the projection. If ``None``, fields will not be
resampled, but will still be colocated.
Returns
-------
xarray.Dataset
Colocated surface current densities for the given surface.
"""
monitor_name = surface.monitor.name
if monitor_name not in sim_data.monitor_data.keys():
raise SetupError(f"No data for monitor named '{monitor_name}' found in sim_data.")
field_data = sim_data[monitor_name]
currents = FieldProjector._fields_to_currents(field_data, surface)
currents = FieldProjector._resample_surface_currents(
currents, sim_data, surface, medium, pts_per_wavelength
)
return currents
@staticmethod
def _fields_to_currents(field_data: FieldData, surface: FieldProjectionSurface) -> FieldData:
"""Returns surface current densities associated with a given :class:`.FieldData` object.
Parameters
----------
field_data : :class:`.FieldData`
Container for field data associated with the given near field surface.
surface: :class:`.FieldProjectionSurface`
:class:`.FieldProjectionSurface` to use as source of near field.
Returns
-------
:class:`.FieldData`
Surface current densities for the given surface.
"""
# figure out which field components are tangential or normal to the monitor
_, (cmp_1, cmp_2) = surface.monitor.pop_axis(("x", "y", "z"), axis=surface.axis)
signs = np.array([-1, 1])
if surface.axis % 2 != 0:
signs *= -1
if surface.normal_dir == "-":
signs *= -1
E1 = "E" + cmp_1
E2 = "E" + cmp_2
H1 = "H" + cmp_1
H2 = "H" + cmp_2
surface_currents = {}
surface_currents[E2] = field_data.field_components[H1] * signs[1]
surface_currents[E1] = field_data.field_components[H2] * signs[0]
surface_currents[H2] = field_data.field_components[E1] * signs[0]
surface_currents[H1] = field_data.field_components[E2] * signs[1]
new_monitor = surface.monitor.copy(update=dict(fields=[E1, E2, H1, H2]))
return FieldData(
monitor=new_monitor,
symmetry=field_data.symmetry,
symmetry_center=field_data.symmetry_center,
grid_expanded=field_data.grid_expanded,
**surface_currents,
)
@staticmethod
def _resample_surface_currents(
currents: FieldData,
sim_data: SimulationData,
surface: FieldProjectionSurface,
medium: MediumType,
pts_per_wavelength: int = PTS_PER_WVL,
) -> xr.Dataset:
"""Returns the surface current densities associated with the surface monitor.
Parameters
----------
currents : :class:`.FieldData`
Surface currents defined on the original Yee grid.
sim_data : :class:`.SimulationData`
Container for simulation data containing the near field monitors.
surface: :class:`.FieldProjectionSurface`
:class:`.FieldProjectionSurface` to use as source of near field.
medium : :class:`.MediumType`
Background medium through which to project fields.
pts_per_wavelength : int = 10
Number of points per wavelength with which to discretize the
surface monitors for the projection. If ``None``, fields will not be
resampled, but will still be colocated.
Returns
-------
xarray.Dataset
Colocated surface current densities for the given surface.
"""
# colocate surface currents on a regular grid of points on the monitor based on wavelength
colocation_points = [None] * 3
colocation_points[surface.axis] = surface.monitor.center[surface.axis]
# use the highest frequency associated with the monitor to resample the surface currents
frequency = max(surface.monitor.freqs)
eps_complex = medium.eps_model(frequency)
index_n, _ = medium.eps_complex_to_nk(eps_complex)
wavelength = C_0 / frequency / index_n
_, idx_uv = surface.monitor.pop_axis((0, 1, 2), axis=surface.axis)
coord_list = sim_data.simulation.grid.boundaries.to_list
for idx in idx_uv:
# pick sample points on the monitor and handle the possibility of an "infinite" monitor
# Fields within PML regions are included, to match the server-side computation.
start = np.maximum(
surface.monitor.center[idx] - surface.monitor.size[idx] / 2.0,
coord_list[idx][0],
)
stop = np.minimum(
surface.monitor.center[idx] + surface.monitor.size[idx] / 2.0,
coord_list[idx][-1],
)
if pts_per_wavelength is None:
points = sim_data.simulation.grid.boundaries.to_list[idx]
points[np.argwhere(points < start)] = start
points[np.argwhere(points > stop)] = stop
colocation_points[idx] = np.unique(points)
else:
size = stop - start
num_pts = int(np.ceil(pts_per_wavelength * size / wavelength))
points = np.linspace(start, stop, num_pts)
colocation_points[idx] = points
for idx, points in enumerate(colocation_points):
if np.array(points).size <= 1:
colocation_points[idx] = None
currents = currents.colocate(*colocation_points)
return currents
[docs]
@staticmethod
def trapezoid(
ary: np.ndarray,
pts: Union[Iterable[np.ndarray], np.ndarray],
axes: Union[Iterable[int], int] = 0,
):
"""Trapezoidal integration in n dimensions.
Parameters
----------
ary : np.ndarray
Array to integrate.
pts : Iterable[np.ndarray]
Iterable of points for each dimension.
axes : Union[Iterable[int], int]
Iterable of axes along which to integrate. If not an iterable, assume 1D integration.
Returns
-------
np.ndarray
Integrated array.
"""
if not isinstance(axes, Iterable):
axes = [axes]
pts = [pts]
for idx, (axis, pt) in enumerate(zip(axes, pts)):
if ary.shape[axis - idx] > 1:
ary = trapz(ary, pt, axis=axis - idx)
else: # array has only one element along axis
ary = ary[(slice(None),) * (axis - idx) + (0,)]
return ary
def _far_fields_for_surface(
self,
frequency: float,
theta: ArrayLikeN2F,
phi: ArrayLikeN2F,
surface: FieldProjectionSurface,
currents: xr.Dataset,
medium: MediumType,
) -> np.ndarray:
"""Compute far fields at an angle in spherical coordinates
for a given set of surface currents and observation angles.
Parameters
----------
frequency : float
Frequency to select from each :class:`.FieldMonitor` to use for projection.
Must be a frequency stored in each :class:`FieldMonitor`.
theta : Union[float, Tuple[float, ...], np.ndarray]
Polar angles (rad) downward from x=y=0 line relative to the local origin.
phi : Union[float, Tuple[float, ...], np.ndarray]
Azimuthal (rad) angles from y=z=0 line relative to the local origin.
surface: :class:`FieldProjectionSurface`
:class:`FieldProjectionSurface` object to use as source of near field.
currents : xarray.Dataset
xarray Dataset containing surface currents associated with the surface monitor.
medium : :class:`.MediumType`
Background medium through which to project fields.
Returns
-------
np.ndarray
With leading dimension containing ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``
projected fields for each frequency.
"""
try:
currents_f = currents.sel(f=frequency)
except Exception as e:
raise SetupError(
f"Frequency {frequency} not found in fields for monitor '{surface.monitor.name}'."
) from e
idx_w, idx_uv = surface.monitor.pop_axis((0, 1, 2), axis=surface.axis)
_, source_names = surface.monitor.pop_axis(("x", "y", "z"), axis=surface.axis)
# integration dimension for 2d far field projection
zero_dim = [dim for dim, size in enumerate(self.sim_data.simulation.size) if size == 0]
if self.is_2d_simulation:
# Ensure zero_dim has a single element since {zero_dim} expects a value
if len(zero_dim) != 1:
raise ValueError("Expected exactly one dimension with size 0 for 2D simulation")
zero_dim = zero_dim[0]
integration_axis = {0, 1, 2} - {zero_dim, surface.axis}
idx_int_1d = integration_axis.pop()
idx_u, idx_v = idx_uv
cmp_1, cmp_2 = source_names
theta = np.atleast_1d(theta)
phi = np.atleast_1d(phi)
propagation_factor = -1j * AbstractFieldProjectionData.wavenumber(
medium=medium, frequency=frequency
)
sin_theta = np.sin(theta)
cos_theta = np.cos(theta)
sin_phi = np.sin(phi)
cos_phi = np.cos(phi)
pts = [currents[name].values for name in ["x", "y", "z"]]
phase_0 = np.exp(np.einsum("i,j,k->ijk", propagation_factor * pts[0], sin_theta, cos_phi))
phase_1 = np.exp(np.einsum("i,j,k->ijk", propagation_factor * pts[1], sin_theta, sin_phi))
phase_2 = np.exp(np.einsum("i,j->ij", propagation_factor * pts[2], cos_theta))
E1 = "E" + cmp_1
E2 = "E" + cmp_2
H1 = "H" + cmp_1
H2 = "H" + cmp_2
def contract(currents):
return anp.einsum("xtp,ytp,zt,xyz->xyztp", phase_0, phase_1, phase_2, currents)
jm = []
for field_component in (E1, E2, H1, H2):
currents = currents_f[field_component].data
currents = anp.reshape(currents, currents_f[field_component].shape)
currents_phase = contract(currents)
if self.is_2d_simulation:
jm_i = self.trapezoid(currents_phase, pts[idx_int_1d], idx_int_1d)
else:
jm_i = self.trapezoid(currents_phase, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
jm.append(anp.reshape(jm_i, (len(theta), len(phi))))
order = [idx_u, idx_v, idx_w]
zeros = np.zeros(jm[0].shape)
# for each index (0, 1, 2), if itβs in the first two elements of order,
# select the corresponding jm element for J or the offset element (+2) for M
J = anp.array([jm[order.index(i)] if i in order[:2] else zeros for i in range(3)])
M = anp.array([jm[order.index(i) + 2] if i in order[:2] else zeros for i in range(3)])
cos_theta_cos_phi = cos_theta[:, None] * cos_phi[None, :]
cos_theta_sin_phi = cos_theta[:, None] * sin_phi[None, :]
# Ntheta (8.33a)
Ntheta = J[0] * cos_theta_cos_phi + J[1] * cos_theta_sin_phi - J[2] * sin_theta[:, None]
# Nphi (8.33b)
Nphi = -J[0] * sin_phi[None, :] + J[1] * cos_phi[None, :]
# Ltheta (8.34a)
Ltheta = M[0] * cos_theta_cos_phi + M[1] * cos_theta_sin_phi - M[2] * sin_theta[:, None]
# Lphi (8.34b)
Lphi = -M[0] * sin_phi[None, :] + M[1] * cos_phi[None, :]
eta = ETA_0 / np.sqrt(medium.eps_model(frequency))
Etheta = -(Lphi + eta * Ntheta)
Ephi = Ltheta - eta * Nphi
Er = anp.zeros_like(Ephi)
Htheta = -Ephi / eta
Hphi = Etheta / eta
Hr = anp.zeros_like(Hphi)
return anp.array([Er, Etheta, Ephi, Hr, Htheta, Hphi])
[docs]
@staticmethod
def apply_window_to_currents(
proj_monitor: AbstractFieldProjectionMonitor, currents: xr.Dataset
) -> xr.Dataset:
"""Apply windowing function to the surface currents."""
if proj_monitor.size.count(0.0) == 0:
return currents
if proj_monitor.window_size == (0, 0):
return currents
pts = [currents[name].values for name in ["x", "y", "z"]]
custom_bounds = [
[pts[i][0] for i in range(3)],
[pts[i][-1] for i in range(3)],
]
window_size, window_minus, window_plus = proj_monitor.window_parameters(
custom_bounds=custom_bounds
)
new_currents = currents.copy(deep=True)
for dim, (dim_name, points) in enumerate(zip("xyz", pts)):
window_fn = proj_monitor.window_function(
points=points,
window_size=window_size,
window_minus=window_minus,
window_plus=window_plus,
dim=dim,
)
window_data = xr.DataArray(
window_fn,
dims=[dim_name],
coords=[points],
)
new_currents *= window_data
return new_currents
[docs]
def project_fields(
self, proj_monitor: AbstractFieldProjectionMonitor
) -> AbstractFieldProjectionData:
"""Compute projected fields.
Parameters
----------
proj_monitor : :class:`.AbstractFieldProjectionMonitor`
Instance of :class:`.AbstractFieldProjectionMonitor` defining the projection
observation grid.
Returns
-------
:class:`.AbstractFieldProjectionData`
Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``.
"""
if isinstance(proj_monitor, FieldProjectionAngleMonitor):
return self._project_fields_angular(proj_monitor)
if isinstance(proj_monitor, FieldProjectionCartesianMonitor):
return self._project_fields_cartesian(proj_monitor)
return self._project_fields_kspace(proj_monitor)
def _project_fields_angular(
self, monitor: FieldProjectionAngleMonitor
) -> FieldProjectionAngleData:
"""Compute projected fields on an angle-based grid in spherical coordinates.
Parameters
----------
monitor : :class:`.FieldProjectionAngleMonitor`
Instance of :class:`.FieldProjectionAngleMonitor` defining the projection
observation grid.
Returns
-------
:class:.`FieldProjectionAngleData`
Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``.
"""
freqs = np.atleast_1d(self.frequencies)
theta = np.atleast_1d(monitor.theta)
phi = np.atleast_1d(monitor.phi)
# compute projected fields for the dataset associated with each monitor
field_names = ("Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi")
fields = np.zeros((len(field_names), 1, len(theta), len(phi), len(freqs)), dtype=complex)
medium = monitor.medium if monitor.medium else self.medium
k = AbstractFieldProjectionData.wavenumber(medium=medium, frequency=freqs)
phase = np.atleast_1d(
AbstractFieldProjectionData.propagation_factor(
dist=monitor.proj_distance, k=k, is_2d_simulation=self.is_2d_simulation
)
)
for surface in self.surfaces:
# apply windowing to currents
currents = self.apply_window_to_currents(monitor, self.currents[surface.monitor.name])
if monitor.far_field_approx:
for idx_f, frequency in enumerate(freqs):
_fields = self._far_fields_for_surface(
frequency=frequency,
theta=theta,
phi=phi,
surface=surface,
currents=currents,
medium=medium,
)
fields = add_at(fields, [..., idx_f], _fields[:, None] * phase[idx_f])
else:
iter_coords = [
([_theta, _phi], [i, j])
for i, _theta in enumerate(theta)
for j, _phi in enumerate(phi)
]
for (_theta, _phi), (i, j) in track(
iter_coords,
description=f"Processing surface monitor '{surface.monitor.name}'...",
console=get_logging_console(),
):
_x, _y, _z = monitor.sph_2_car(monitor.proj_distance, _theta, _phi)
_fields = self._fields_for_surface_exact(
x=_x, y=_y, z=_z, surface=surface, currents=currents, medium=medium
)
where = (slice(None), 0, i, j)
_fields = anp.reshape(_fields, fields[where].shape)
fields = add_at(fields, where, _fields)
coords = {"r": np.atleast_1d(monitor.proj_distance), "theta": theta, "phi": phi, "f": freqs}
fields = {
name: FieldProjectionAngleDataArray(field, coords=coords)
for name, field in zip(field_names, fields)
}
return FieldProjectionAngleData(
monitor=monitor,
projection_surfaces=self.surfaces,
medium=medium,
is_2d_simulation=self.is_2d_simulation,
**fields,
)
def _project_fields_cartesian(
self, monitor: FieldProjectionCartesianMonitor
) -> FieldProjectionCartesianData:
"""Compute projected fields on a Cartesian grid in spherical coordinates.
Parameters
----------
monitor : :class:`.FieldProjectionCartesianMonitor`
Instance of :class:`.FieldProjectionCartesianMonitor` defining the projection
observation grid.
Returns
-------
:class:.`FieldProjectionCartesianData`
Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``.
"""
freqs = np.atleast_1d(self.frequencies)
x, y, z = monitor.unpop_axis(
monitor.proj_distance, (monitor.x, monitor.y), axis=monitor.proj_axis
)
x, y, z = list(map(np.atleast_1d, [x, y, z]))
# compute projected fields for the dataset associated with each monitor
field_names = ("Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi")
fields = np.zeros((len(field_names), len(x), len(y), len(z), len(freqs)), dtype=complex)
medium = monitor.medium if monitor.medium else self.medium
wavenumber = AbstractFieldProjectionData.wavenumber(medium=medium, frequency=freqs)
# Zip together all combinations of observation points for better progress tracking
iter_coords = [
([_x, _y, _z], [i, j, k])
for i, _x in enumerate(x)
for j, _y in enumerate(y)
for k, _z in enumerate(z)
]
for (_x, _y, _z), (i, j, k) in track(
iter_coords, description="Computing projected fields", console=get_logging_console()
):
r, theta, phi = monitor.car_2_sph(_x, _y, _z)
phase = np.atleast_1d(
AbstractFieldProjectionData.propagation_factor(
dist=r, k=wavenumber, is_2d_simulation=self.is_2d_simulation
)
)
for surface in self.surfaces:
# apply windowing to currents
currents = self.apply_window_to_currents(
monitor, self.currents[surface.monitor.name]
)
if monitor.far_field_approx:
for idx_f, frequency in enumerate(freqs):
_fields = self._far_fields_for_surface(
frequency=frequency,
theta=theta,
phi=phi,
surface=surface,
currents=currents,
medium=medium,
)
where = (slice(None), i, j, k, idx_f)
_fields = anp.reshape(_fields, fields[where].shape)
fields = add_at(fields, where, _fields * phase[idx_f])
else:
_fields = self._fields_for_surface_exact(
x=_x, y=_y, z=_z, surface=surface, currents=currents, medium=medium
)
where = (slice(None), i, j, k)
_fields = anp.reshape(_fields, fields[where].shape)
fields = add_at(fields, where, _fields)
coords = {"x": x, "y": y, "z": z, "f": freqs}
fields = {
name: FieldProjectionCartesianDataArray(field, coords=coords)
for name, field in zip(field_names, fields)
}
return FieldProjectionCartesianData(
monitor=monitor, projection_surfaces=self.surfaces, medium=medium, **fields
)
def _project_fields_kspace(
self, monitor: FieldProjectionKSpaceMonitor
) -> FieldProjectionKSpaceData:
"""Compute projected fields on a k-space grid in spherical coordinates.
Parameters
----------
monitor : :class:`.FieldProjectionKSpaceMonitor`
Instance of :class:`.FieldProjectionKSpaceMonitor` defining the projection
observation grid.
Returns
-------
:class:.`FieldProjectionKSpaceData`
Data structure with ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``.
"""
freqs = np.atleast_1d(self.frequencies)
ux = np.atleast_1d(monitor.ux)
uy = np.atleast_1d(monitor.uy)
# compute projected fields for the dataset associated with each monitor
field_names = ("Er", "Etheta", "Ephi", "Hr", "Htheta", "Hphi")
fields = np.zeros((len(field_names), len(ux), len(uy), 1, len(freqs)), dtype=complex)
medium = monitor.medium if monitor.medium else self.medium
k = AbstractFieldProjectionData.wavenumber(medium=medium, frequency=freqs)
phase = np.atleast_1d(
AbstractFieldProjectionData.propagation_factor(
dist=monitor.proj_distance, k=k, is_2d_simulation=self.is_2d_simulation
)
)
# Zip together all combinations of observation points for better progress tracking
iter_coords = [([_ux, _uy], [i, j]) for i, _ux in enumerate(ux) for j, _uy in enumerate(uy)]
for (_ux, _uy), (i, j) in track(
iter_coords, description="Computing projected fields", console=get_logging_console()
):
theta, phi = monitor.kspace_2_sph(_ux, _uy, monitor.proj_axis)
for surface in self.surfaces:
# apply windowing to currents
currents = self.apply_window_to_currents(
monitor, self.currents[surface.monitor.name]
)
if monitor.far_field_approx:
for idx_f, frequency in enumerate(freqs):
_fields = self._far_fields_for_surface(
frequency=frequency,
theta=theta,
phi=phi,
surface=surface,
currents=currents,
medium=medium,
)
where = (slice(None), i, j, 0, idx_f)
_fields = anp.reshape(_fields, fields[where].shape)
fields = add_at(fields, where, _fields * phase[idx_f])
else:
_x, _y, _z = monitor.sph_2_car(monitor.proj_distance, theta, phi)
_fields = self._fields_for_surface_exact(
x=_x, y=_y, z=_z, surface=surface, currents=currents, medium=medium
)
where = (slice(None), i, j, 0)
_fields = anp.reshape(_fields, fields[where].shape)
fields = add_at(fields, where, _fields)
coords = {
"ux": np.array(monitor.ux),
"uy": np.array(monitor.uy),
"r": np.atleast_1d(monitor.proj_distance),
"f": freqs,
}
fields = {
name: FieldProjectionKSpaceDataArray(field, coords=coords)
for name, field in zip(field_names, fields)
}
return FieldProjectionKSpaceData(
monitor=monitor, projection_surfaces=self.surfaces, medium=medium, **fields
)
"""Exact projections"""
def _fields_for_surface_exact(
self,
x: float,
y: float,
z: float,
surface: FieldProjectionSurface,
currents: xr.Dataset,
medium: MediumType,
) -> np.ndarray:
"""Compute projected fields in spherical coordinates at a given projection point on a
Cartesian grid for a given set of surface currents using the exact homogeneous medium
Green's function without geometric approximations.
Parameters
----------
x : float
Observation point x-coordinate (microns) relative to the local origin.
y : float
Observation point y-coordinate (microns) relative to the local origin.
z : float
Observation point z-coordinate (microns) relative to the local origin.
surface: :class:`FieldProjectionSurface`
:class:`FieldProjectionSurface` object to use as source of near field.
currents : xarray.Dataset
xarray Dataset containing surface currents associated with the surface monitor.
medium : :class:`.MediumType`
Background medium through which to project fields.
Returns
-------
np.ndarray
With leading dimension containing ``Er``, ``Etheta``, ``Ephi``, ``Hr``, ``Htheta``, ``Hphi``
projected fields for each frequency.
"""
freqs = anp.array(self.frequencies)
i_omega = 1j * 2.0 * np.pi * freqs[None, None, None, :]
wavenumber = AbstractFieldProjectionData.wavenumber(frequency=freqs, medium=medium)
wavenumber = wavenumber[None, None, None, :] # add space dimensions
eps_complex = medium.eps_model(frequency=freqs)
epsilon = EPSILON_0 * eps_complex[None, None, None, :]
# source points
pts = [currents[name].values for name in ["x", "y", "z"]]
# transform the coordinate system so that the origin is at the source point
# then the observation points in the new system are:
x_new, y_new, z_new = (pt_obs - pt_src for pt_src, pt_obs in zip(pts, [x, y, z]))
# tangential source components to use
idx_w, idx_uv = surface.monitor.pop_axis((0, 1, 2), axis=surface.axis)
_, source_names = surface.monitor.pop_axis(("x", "y", "z"), axis=surface.axis)
idx_u, idx_v = idx_uv
cmp_1, cmp_2 = source_names
# set the surface current density Cartesian components
J = [None] * 3
M = [None] * 3
J[idx_u] = currents[f"E{cmp_1}"].data
J[idx_v] = currents[f"E{cmp_2}"].data
J[idx_w] = anp.zeros(J[idx_u].shape)
M[idx_u] = currents[f"H{cmp_1}"].data
M[idx_v] = currents[f"H{cmp_2}"].data
M[idx_w] = anp.zeros(M[idx_u].shape)
# observation point in the new spherical system
r, theta_obs, phi_obs = surface.monitor.car_2_sph(
x_new[:, None, None, None], y_new[None, :, None, None], z_new[None, None, :, None]
)
# angle terms
sin_theta = anp.sin(theta_obs)
cos_theta = anp.cos(theta_obs)
sin_phi = anp.sin(phi_obs)
cos_phi = anp.cos(phi_obs)
# Green's function and terms related to its derivatives
ikr = 1j * wavenumber * r
G = anp.exp(ikr) / (4.0 * np.pi * r)
dG_dr = G * (ikr - 1.0) / r
d2G_dr2 = dG_dr * (ikr - 1.0) / r + G / (r**2)
# operations between unit vectors and currents
def r_x_current(current: Tuple[np.ndarray, ...]) -> Tuple[np.ndarray, ...]:
"""Cross product between the r unit vector and the current."""
return [
sin_theta * sin_phi * current[2] - cos_theta * current[1],
cos_theta * current[0] - sin_theta * cos_phi * current[2],
sin_theta * cos_phi * current[1] - sin_theta * sin_phi * current[0],
]
def r_dot_current(current: Tuple[np.ndarray, ...]) -> np.ndarray:
"""Dot product between the r unit vector and the current."""
return (
sin_theta * cos_phi * current[0]
+ sin_theta * sin_phi * current[1]
+ cos_theta * current[2]
)
def r_dot_current_dtheta(current: Tuple[np.ndarray, ...]) -> np.ndarray:
"""Theta derivative of the dot product between the r unit vector and the current."""
return (
cos_theta * cos_phi * current[0]
+ cos_theta * sin_phi * current[1]
- sin_theta * current[2]
)
def r_dot_current_dphi_div_sin_theta(current: Tuple[np.ndarray, ...]) -> np.ndarray:
"""Phi derivative of the dot product between the r unit vector and the current,
analytically divided by sin theta."""
return -sin_phi * current[0] + cos_phi * current[1]
def grad_Gr_r_dot_current(current: Tuple[np.ndarray, ...]) -> Tuple[np.ndarray, ...]:
"""Gradient of the product of the gradient of the Green's function and the dot product
between the r unit vector and the current."""
temp = [
d2G_dr2 * r_dot_current(current),
dG_dr * r_dot_current_dtheta(current) / r,
dG_dr * r_dot_current_dphi_div_sin_theta(current) / r,
]
# convert to Cartesian coordinates
return surface.monitor.sph_2_car_field(temp[0], temp[1], temp[2], theta_obs, phi_obs)
def potential_terms(current: Tuple[np.ndarray, ...], const: complex):
"""Assemble vector potential and its derivatives."""
r_x_c = r_x_current(current)
pot = [const * item * G for item in current]
curl_pot = [const * item * dG_dr for item in r_x_c]
grad_div_pot = grad_Gr_r_dot_current(current)
grad_div_pot = [const * item for item in grad_div_pot]
return pot, curl_pot, grad_div_pot
# magnetic vector potential terms
A, curl_A, grad_div_A = potential_terms(J, MU_0)
# electric vector potential terms
F, curl_F, grad_div_F = potential_terms(M, epsilon)
# assemble the electric field components (Taflove 8.24, 8.27)
e_x_integrand, e_y_integrand, e_z_integrand = (
i_omega * (a + grad_div_a / (wavenumber**2)) - curl_f / epsilon
for a, grad_div_a, curl_f in zip(A, grad_div_A, curl_F)
)
# assemble the magnetic field components (Taflove 8.25, 8.28)
h_x_integrand, h_y_integrand, h_z_integrand = (
i_omega * (f + grad_div_f / (wavenumber**2)) + curl_a / MU_0
for f, grad_div_f, curl_a in zip(F, grad_div_F, curl_A)
)
# integrate over the surface
e_x = self.trapezoid(e_x_integrand, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
e_y = self.trapezoid(e_y_integrand, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
e_z = self.trapezoid(e_z_integrand, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
h_x = self.trapezoid(h_x_integrand, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
h_y = self.trapezoid(h_y_integrand, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
h_z = self.trapezoid(h_z_integrand, (pts[idx_u], pts[idx_v]), (idx_u, idx_v))
# observation point in the original spherical system
_, theta_obs, phi_obs = surface.monitor.car_2_sph(x, y, z)
# convert fields to the original spherical system
e_r, e_theta, e_phi = surface.monitor.car_2_sph_field(e_x, e_y, e_z, theta_obs, phi_obs)
h_r, h_theta, h_phi = surface.monitor.car_2_sph_field(h_x, h_y, h_z, theta_obs, phi_obs)
return anp.array([e_r, e_theta, e_phi, h_r, h_theta, h_phi])