"""Study objects for dipole emission analysis."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from pydantic import Field, PositiveFloat, field_validator, model_validator
from tidy3d.components.base import Tidy3dBaseModel, cached_property
from tidy3d.components.data.data_array import PointDataArray, SphericalAngleDataArray
from tidy3d.components.data.point_cloud import canonicalize_point_cloud_points
from tidy3d.components.geometry.base import Box
from tidy3d.components.monitor import DIPOLE_EMISSION_DIPOLE_AXES, DipoleEmissionMonitor
from tidy3d.components.simulation import (
MAX_SIMULATION_DATA_SIZE_GB,
WARN_MONITOR_DATA_SIZE_GB,
Simulation,
)
from tidy3d.components.source.field import TFSF, FixedAngleSpec
from tidy3d.components.structure import MeshOverrideStructure
from tidy3d.components.types import ArrayFloat1D, ArrayFloat2D, Axis
from tidy3d.components.types.time import SourceTimeType
from tidy3d.components.validators import points_outside_bounds
from tidy3d.constants import MICROMETER
from tidy3d.exceptions import SetupError, ValidationError
from tidy3d.log import log
from tidy3d.packaging import check_tidy3d_extras_licensed_feature
from tidy3d.plugins.dipole_emission.constants import (
EMISSION_MONITOR_NAME,
MESH_OVERRIDE_NAME,
RESERVED_NAME_PREFIX,
SOURCE_NAME_PREFIX,
)
from tidy3d.plugins.dipole_emission.data_array import (
DipoleEmissionStudyDataArray,
DipoleEmissionStudyPositionDataArray,
)
if TYPE_CHECKING:
from collections.abc import Sequence
Direction = Literal["+", "-"]
Polarization = Literal["p", "s"]
FEATURE_NAME = "dipole_emission"
[docs]
class EmissionAnalysisRegion(Box):
"""Region over which dipole emission is evaluated.
The region should contain the sampled emitters and the part of the optical
structure whose scattering response contributes to the collected radiation.
The ``normal_axis`` and ``direction`` fields select the collection half-space
used to interpret far-field angles. For example, ``normal_axis=2`` and
``direction="+"`` describes radiation collected toward ``+z``. Generated
simulations require that collection side to be represented by a single
background medium with real, nondispersive refractive index over the
requested frequencies. The ``dl`` field controls the spatial resolution in
the plane transverse to the collection direction.
"""
normal_axis: Axis = Field(
title="Normal Axis",
description="Coordinate axis normal to the selected collection side.",
)
direction: Direction = Field(
title="Collection Direction",
description="Side of the structure, along ``normal_axis``, where emission is collected.",
)
dl: PositiveFloat | tuple[PositiveFloat, PositiveFloat] = Field(
title="Transverse Grid Size",
description=(
"Spatial resolution along axes transverse to ``normal_axis``. A scalar applies "
"to both transverse axes; a pair sets the two transverse resolutions independently."
),
json_schema_extra={"units": MICROMETER},
)
@model_validator(mode="after")
def _validate_box_size(self) -> EmissionAnalysisRegion:
"""Require a finite, positive 3D analysis box."""
size = np.asarray(self.size, dtype=float)
if size.shape != (3,) or not np.all(np.isfinite(size)):
raise ValidationError("'analysis_region.size' must contain three finite values.")
if np.any(size <= 0):
raise ValidationError("'analysis_region.size' must be positive along all axes.")
return self
@property
def tangential_axes(self) -> tuple[int, int]:
"""Axes transverse to ``normal_axis``."""
return tuple(axis for axis in range(3) if axis != self.normal_axis)
@cached_property
def geometry(self) -> Box:
""":class:`~tidy3d.Box` representation of the analysis region."""
return Box(center=self.center, size=self.size)
[docs]
def mesh_override(self, name: str) -> MeshOverrideStructure:
"""Create the transverse-resolution override for this region."""
dl_values = [None, None, None]
transverse_dl = self.dl if isinstance(self.dl, tuple) else (self.dl, self.dl)
for axis, dl in zip(self.tangential_axes, transverse_dl):
dl_values[axis] = dl
return MeshOverrideStructure(
geometry=self.geometry,
dl=tuple(dl_values),
enforce=True,
name=name,
)
[docs]
class DipoleEmissionStudy(Tidy3dBaseModel):
"""Dipole-emission sweep definition.
A study evaluates radiative emission from classical electric dipoles in a
passive optical environment. It reports angular radiation intensity per
squared dipole moment for x-, y-, and z-oriented dipoles after summing over
the sampled dipole positions using ``position_weights``. Optional
position-resolved samples may be retained for selected position indexes. The
result describes radiation into the selected collection half-space;
non-radiative loss, trapped guided modes, and total Purcell-factor
accounting are not included.
"""
base_sim: Simulation = Field(
title="Base Simulation",
description=(
"Source-free simulation describing the passive optical structure. Monitors in "
"this simulation are available only through optional diagnostic batch data."
),
)
analysis_region: EmissionAnalysisRegion = Field(
title="Analysis Region",
description=(
"Region containing the sampled dipoles and defining the far-field collection side."
),
)
positions: PointDataArray = Field(
title="Dipole Positions",
description="Sampled dipole locations as a ``PointDataArray`` with dims ``('index', 'axis')``.",
)
position_weights: ArrayFloat1D | ArrayFloat2D | None = Field(
None,
title="Position Weights",
description=(
"Weights used when summing radiation intensity over ``positions``. Provide one "
"weight per position, or one weight per position and Cartesian dipole axis. If "
"omitted, all sampled positions have unit weight."
),
)
store_position_indexes: tuple[int, ...] = Field(
(),
title="Stored Position Indexes",
description=(
"Zero-based indexes into ``positions`` for individual position-resolved radiation "
"intensity samples to retain. The default stores only the summed result."
),
)
source_time: SourceTimeType = Field(
title="Source Time",
description=(
"Spectral envelope for the emission calculation. The envelope is included in "
"``radiation_intensity`` and divided out by the "
"``radiation_intensity_transfer(...)`` method."
),
)
freqs: ArrayFloat1D = Field(
title="Frequencies",
description="Frequencies at which the emission response is evaluated.",
)
angles: SphericalAngleDataArray = Field(
title="Emission Angles",
description=(
"Far-field observation directions as ``(theta, phi)`` pairs in radians, with "
"``theta`` measured away from the selected collection normal and restricted to "
"``abs(theta) < pi/2``."
),
)
polarizations: tuple[Polarization, ...] = Field(
default=("p", "s"),
title="Emission Polarizations",
description=(
"Far-field polarization components to evaluate at each observation angle. The "
"``p`` component lies in the plane formed by the observation direction and "
"collection normal; ``s`` is orthogonal to that plane."
),
)
@field_validator("positions")
@classmethod
def _validate_positions(cls, val: PointDataArray) -> PointDataArray:
"""Validate sampled dipole positions."""
try:
return canonicalize_point_cloud_points(
val,
empty_error="'positions' must contain at least one point.",
require_real=True,
require_finite=True,
cast_to_float=True,
coordinate_name="'positions'",
preserve_index=True,
)
except ValueError as exc:
raise ValidationError(str(exc)) from None
@field_validator("store_position_indexes")
@classmethod
def _validate_store_position_indexes(cls, val: Sequence[int]) -> tuple[int, ...]:
"""Validate optional stored position indexes."""
indexes = tuple(int(index) for index in val)
if len(set(indexes)) != len(indexes):
raise ValidationError("'store_position_indexes' must not contain duplicates.")
if any(index < 0 for index in indexes):
raise ValidationError("'store_position_indexes' entries must be nonnegative.")
return indexes
@field_validator("freqs")
@classmethod
def _validate_freqs(cls, val: ArrayFloat1D) -> ArrayFloat1D:
"""Validate positive output frequencies."""
freqs = np.asarray(val, dtype=float)
if freqs.ndim != 1 or len(freqs) == 0:
raise ValidationError("'freqs' must be a non-empty 1D array.")
if not np.all(np.isfinite(freqs)) or np.any(freqs <= 0):
raise ValidationError("'freqs' must contain only finite positive values.")
if len(np.unique(freqs)) != len(freqs):
raise ValidationError("'freqs' must be unique.")
return freqs
@field_validator("polarizations")
@classmethod
def _validate_polarizations(cls, val: Sequence[Polarization]) -> tuple[Polarization, ...]:
"""Validate requested emission-polarization labels."""
if len(val) == 0:
raise ValidationError("'polarizations' must contain at least one entry.")
if len(set(val)) != len(val):
raise ValidationError("'polarizations' must not contain duplicates.")
return tuple(val)
@field_validator("angles")
@classmethod
def _validate_angles(cls, val: SphericalAngleDataArray) -> SphericalAngleDataArray:
"""Validate far-field observation angles."""
values = np.asarray(val.values)
if np.iscomplexobj(values):
raise ValidationError("'angles' must contain real values.")
try:
data = np.asarray(values, dtype=float)
except (TypeError, ValueError):
raise ValidationError("'angles' must contain real values.") from None
if data.ndim != 2 or data.shape[1] != 2 or data.shape[0] == 0:
raise ValidationError("'angles' must have shape '(N, 2)' with columns '(theta, phi)'.")
if not np.all(np.isfinite(data)):
raise ValidationError("'angles' must contain only finite values.")
if np.any(np.abs(data[:, 0]) >= np.pi / 2):
raise ValidationError("'angles' theta values must satisfy abs(theta) < pi/2.")
return val
@model_validator(mode="after")
def _validate_study(self) -> DipoleEmissionStudy:
"""Validate study-level invariants that span fields."""
if len(self.base_sim.sources) != 0:
self._raise_validation_error_at_loc(
"'base_sim' must not contain sources.", "base_sim", "sources"
)
if self.base_sim.symmetry != (0, 0, 0):
self._raise_validation_error_at_loc(
"'base_sim.symmetry' must be '(0, 0, 0)' for dipole emission studies.",
"base_sim",
"symmetry",
)
for monitor_index, monitor in enumerate(self.base_sim.monitors):
if monitor.name.startswith(RESERVED_NAME_PREFIX):
self._raise_validation_error_at_loc(
f"Monitor name '{monitor.name}' conflicts with the reserved "
f"dipole-emission prefix '{RESERVED_NAME_PREFIX}'.",
"base_sim",
"monitors",
monitor_index,
"name",
)
points = np.asarray(self.positions.values, dtype=float)
sim_strict_axes = np.asarray([size != 0 for size in self.base_sim.size], dtype=bool)
if np.any(points_outside_bounds(points, self.base_sim.bounds, sim_strict_axes)):
self._raise_validation_error_at_loc(
"'positions' must lie inside the simulation domain.", "positions"
)
region_strict_axes = np.asarray(
[size != 0 for size in self.analysis_region.size], dtype=bool
)
if np.any(points_outside_bounds(points, self.analysis_region.bounds, region_strict_axes)):
self._raise_validation_error_at_loc(
"'positions' must lie inside 'analysis_region'.", "positions"
)
self._call_with_validation_loc(("source_time",), self._validate_source_spectrum)
self._call_with_validation_loc(("position_weights",), self._validate_position_weights)
self._call_with_validation_loc(
("store_position_indexes",), self._validate_stored_position_index_range
)
self._validate_study_batch_data_size()
return self
def _validate_position_weights(self) -> None:
"""Validate position weights against the sampled positions."""
if self.position_weights is None:
return
weights = np.asarray(self.position_weights, dtype=float)
point_shape = np.asarray(self.positions.values).shape
if weights.shape not in ((self.positions.sizes["index"],), point_shape):
raise ValidationError(
"'position_weights' must have one value per position or match the shape of "
"'positions'."
)
if not np.all(np.isfinite(weights)) or np.any(weights < 0):
raise ValidationError("'position_weights' must contain finite nonnegative values.")
if not np.any(weights > 0):
raise ValidationError("'position_weights' must not be all zero (no emitters).")
def _validate_source_spectrum(self) -> None:
"""Validate source spectrum required by transfer normalization."""
_ = self.pulse_spectrum_abs2
def _validate_stored_position_index_range(self) -> None:
"""Validate stored position indexes against the sampled positions."""
if not self.store_position_indexes:
return
num_positions = self.positions.sizes["index"]
if max(self.store_position_indexes) >= num_positions:
raise ValidationError(
"'store_position_indexes' entries must be valid position indexes."
)
def _validate_study_batch_data_size(self) -> None:
"""Warn or error when estimated study batch data is too large to download."""
num_tasks = len(self._expected_task_names())
precision_factor = 2 if self.base_sim.precision == "double" else 1
intrinsic_size_gb = (
self._emission_monitor().storage_size(num_cells=0, tmesh=[])
* precision_factor
* num_tasks
/ 1e9
)
diagnostic_size_gb = 0.0
with log as consolidated_logger:
if intrinsic_size_gb > WARN_MONITOR_DATA_SIZE_GB:
consolidated_logger.warning(
f"Dipole-emission results are estimated to download "
f"{intrinsic_size_gb:1.2f}GB across the study batch. Consider reducing "
"'store_position_indexes', 'angles', 'polarizations', or 'freqs'."
)
for monitor_index, (monitor_name, monitor_size) in enumerate(
self.base_sim.monitors_data_size.items()
):
repeated_monitor_size_gb = monitor_size * num_tasks / 1e9
if repeated_monitor_size_gb > WARN_MONITOR_DATA_SIZE_GB:
consolidated_logger.warning(
f"Diagnostic monitor '{monitor_name}' is estimated to download "
f"{repeated_monitor_size_gb:1.2f}GB across the dipole-emission batch. "
"Consider removing it from 'base_sim' or reducing its stored data.",
custom_loc=["base_sim", "monitors", monitor_index],
)
diagnostic_size_gb += repeated_monitor_size_gb
total_size_gb = intrinsic_size_gb + diagnostic_size_gb
if total_size_gb > MAX_SIMULATION_DATA_SIZE_GB:
error_loc = (
("base_sim", "monitors")
if diagnostic_size_gb > intrinsic_size_gb
else ("store_position_indexes",)
)
self._raise_validation_error_at_loc(
f"Dipole-emission study data is estimated to download {total_size_gb:.2f}GB "
f"across the study batch, including {intrinsic_size_gb:.2f}GB from "
f"dipole-emission results and {diagnostic_size_gb:.2f}GB from diagnostic "
f"monitors; a maximum of {MAX_SIMULATION_DATA_SIZE_GB:.2f}GB is allowed. "
"Consider reducing 'store_position_indexes', 'angles', 'polarizations', "
"or 'freqs', or removing diagnostic monitors from 'base_sim'.",
*error_loc,
)
def _check_license(self) -> None:
"""Check that the local dipole-emission feature is licensed."""
check_tidy3d_extras_licensed_feature(FEATURE_NAME)
@property
def pulse_spectrum_abs2(self) -> ArrayFloat1D:
"""Squared source spectrum magnitude sampled at ``freqs``."""
amps = np.asarray([self.source_time.amp_freq(freq) for freq in self.freqs], dtype=complex)
values = np.abs(amps) ** 2
if not np.all(np.isfinite(values)) or np.any(values <= 0):
raise SetupError(
"Dipole emission requires a nonzero finite source spectrum at every output frequency."
)
return values
@property
def position_weights_array(self) -> ArrayFloat1D | ArrayFloat2D:
"""Weights used to sum radiation intensity over sampled positions and axes."""
if self.position_weights is None:
return np.ones(self.positions.sizes["index"], dtype=float)
return np.asarray(self.position_weights, dtype=float)
@staticmethod
def _opposite_direction(direction: Direction) -> Direction:
"""Reciprocal-probe direction into the analysis region."""
return "-" if direction == "+" else "+"
@staticmethod
def _task_name(angle_index: int, polarization: Polarization) -> str:
"""Generated batch task name for one reciprocity probe."""
return f"{RESERVED_NAME_PREFIX}angle{angle_index:03d}_{polarization}"
@staticmethod
def _source_name(angle_index: int, polarization: Polarization) -> str:
"""Study source name for one angle and polarization."""
return f"{SOURCE_NAME_PREFIX}_angle{angle_index:03d}_{polarization}"
def _angle_values(self) -> tuple[np.ndarray, np.ndarray]:
"""Return stored ``(theta, phi)`` arrays in study order."""
return (
np.asarray(self.angles.sel(spherical_coordinate="theta").values, dtype=float),
np.asarray(self.angles.sel(spherical_coordinate="phi").values, dtype=float),
)
def _expected_task_names(self) -> tuple[tuple[str, int, Polarization], ...]:
"""Expected task names in reduction stacking order."""
return tuple(
(self._task_name(angle_index, polarization), angle_index, polarization)
for angle_index in range(self.angles.sizes["index"])
for polarization in self.polarizations
)
def _source(
self, angle_index: int, theta: float, phi: float, polarization: Polarization
) -> TFSF:
"""Build the reciprocity probe source for one angle and polarization."""
return TFSF(
center=self.analysis_region.center,
size=self.analysis_region.size,
source_time=self.source_time,
direction=self._opposite_direction(self.analysis_region.direction),
injection_axis=self.analysis_region.normal_axis,
angle_theta=float(theta),
angle_phi=float(phi),
pol_angle=0.0 if polarization == "p" else np.pi / 2,
angular_spec=FixedAngleSpec(),
name=self._source_name(angle_index, polarization),
)
def _emission_monitor(self) -> DipoleEmissionMonitor:
"""Build the monitor used to store reduced emission data."""
return DipoleEmissionMonitor(
points=self.positions,
freqs=self.freqs,
position_weights=self.position_weights_array,
store_position_indexes=self.store_position_indexes,
name=EMISSION_MONITOR_NAME,
)
def _result_coords(self) -> dict[str, Any]:
"""Coordinates for compact study-level emission arrays.
The ``angle`` dimension carries only integer indices; the corresponding
``(theta, phi)`` directions live solely in ``self.angles`` to avoid
duplicating angle metadata that the HDF5 round trip would not preserve.
"""
return {
"dipole_axis": list(DIPOLE_EMISSION_DIPOLE_AXES),
"polarization": list(self.polarizations),
"angle": np.arange(self.angles.sizes["index"]),
"f": self.freqs,
}
def _position_result_coords(self) -> dict[str, Any]:
"""Coordinates for selected position-resolved study-level emission arrays."""
coords = self._result_coords()
position_index_values = np.asarray(self.positions.coords["index"].values)
coords["index"] = position_index_values[list(self.store_position_indexes)]
return coords
def _monitor_data_from_batch_data(self, batch_data: Any) -> dict[tuple[int, Polarization], Any]:
"""Extract reduced dipole-emission monitor data from user-managed ``BatchData``."""
monitor_data_by_task: dict[tuple[int, Polarization], Any] = {}
for task_name, angle_index, polarization in self._expected_task_names():
try:
sim_data = batch_data[task_name]
except KeyError as exc:
raise SetupError(
f"Dipole emission batch data is missing task '{task_name}' from "
"to_simulations(...). "
f"Original error: {exc}"
) from exc
except (TypeError, AttributeError) as exc:
raise SetupError(
"DipoleEmissionStudy.compose(...) expects batch data that supports "
f"task-name lookup. Original error: {exc}"
) from exc
self._validate_task_simulation_source(
sim_data=sim_data,
task_name=task_name,
angle_index=angle_index,
polarization=polarization,
)
try:
monitor_data = sim_data[EMISSION_MONITOR_NAME]
except KeyError as exc:
raise SetupError(
f"Dipole emission task '{task_name}' is missing monitor "
f"'{EMISSION_MONITOR_NAME}'. Original error: {exc}"
) from exc
except (TypeError, AttributeError) as exc:
raise SetupError(
f"Dipole emission task '{task_name}' does not contain ordinary "
f"SimulationData monitor lookup. Original error: {exc}"
) from exc
monitor_data_by_task[(angle_index, polarization)] = monitor_data
return monitor_data_by_task
def _validate_task_simulation_source(
self,
sim_data: Any,
task_name: str,
angle_index: int,
polarization: Polarization,
) -> None:
"""Validate generated source provenance when batch data exposes a simulation."""
simulation = getattr(sim_data, "simulation", None)
if simulation is None:
return
sources = tuple(getattr(simulation, "sources", ()))
theta_values, phi_values = self._angle_values()
expected_source = self._source(
angle_index=angle_index,
theta=theta_values[angle_index],
phi=phi_values[angle_index],
polarization=polarization,
)
if len(sources) != 1 or not isinstance(sources[0], TFSF):
raise SetupError(
f"Dipole emission task '{task_name}' must contain the generated TFSF source."
)
source = sources[0]
numeric_checks = (
("center", source.center, expected_source.center),
("size", source.size, expected_source.size),
("angle_theta", source.angle_theta, expected_source.angle_theta),
("angle_phi", source.angle_phi, expected_source.angle_phi),
("pol_angle", source.pol_angle, expected_source.pol_angle),
)
for field_name, actual, expected in numeric_checks:
if not np.allclose(np.asarray(actual), np.asarray(expected), rtol=1e-12, atol=0.0):
raise SetupError(
f"Dipole emission task '{task_name}' has source '{field_name}' that "
"does not match this study."
)
scalar_checks = (
("name", source.name, expected_source.name),
("direction", source.direction, expected_source.direction),
("injection_axis", source.injection_axis, expected_source.injection_axis),
("angular_spec", type(source.angular_spec), type(expected_source.angular_spec)),
("source_time", source.source_time, expected_source.source_time),
)
for field_name, actual, expected in scalar_checks:
if actual != expected:
raise SetupError(
f"Dipole emission task '{task_name}' has source '{field_name}' that "
"does not match this study."
)
def _validate_monitor_array(
self,
values: np.ndarray,
task_label: str,
array_name: str,
expected_shape: tuple[int, ...],
) -> None:
"""Validate reduced monitor arrays before stacking them into study data."""
if values.shape != expected_shape:
raise SetupError(
f"Dipole emission array '{array_name}' in task '{task_label}' has shape "
f"{values.shape}; expected {expected_shape}."
)
def _validate_monitor_data(
self,
monitor_data: Any,
task_label: str,
) -> None:
"""Validate a reduced monitor data object before composing study data."""
from tidy3d.components.data.monitor_data import DipoleEmissionData
if not isinstance(monitor_data, DipoleEmissionData):
raise SetupError(
f"Dipole emission task '{task_label}' must contain DipoleEmissionData; "
f"found {type(monitor_data).__name__}."
)
expected_monitor = self._emission_monitor()
monitor = monitor_data.monitor
if monitor.name != expected_monitor.name:
raise SetupError(
f"Dipole emission monitor data in task '{task_label}' references monitor "
f"'{monitor.name}', expected '{expected_monitor.name}'."
)
if tuple(monitor.store_position_indexes) != tuple(expected_monitor.store_position_indexes):
raise SetupError(
f"Dipole emission monitor data in task '{task_label}' has stored position indexes "
"that do not match this study."
)
numeric_checks = (
("freqs", monitor.freqs, expected_monitor.freqs),
("points", monitor.points.values, expected_monitor.points.values),
("position_weights", monitor.position_weights, expected_monitor.position_weights),
)
for field_name, actual, expected in numeric_checks:
actual_array = np.asarray(actual, dtype=float)
expected_array = np.asarray(expected, dtype=float)
if actual_array.shape != expected_array.shape or not np.allclose(
actual_array, expected_array, rtol=1e-12, atol=0.0
):
raise SetupError(
f"Dipole emission monitor data in task '{task_label}' has '{field_name}' "
"that does not match this study."
)
def _compose_from_monitor_data(
self, monitor_data_by_task: dict[tuple[int, Polarization], Any]
) -> Any:
"""Build ``DipoleEmissionStudyData`` from reduced per-task monitor data."""
from tidy3d.plugins.dipole_emission.data import DipoleEmissionStudyData
num_angles = self.angles.sizes["index"]
num_pols = len(self.polarizations)
num_freqs = len(self.freqs)
num_position_samples = len(self.store_position_indexes)
pol_index_by_label = {
polarization: index for index, polarization in enumerate(self.polarizations)
}
monitor_records = []
result_dtypes = []
expected_monitor_shape = (len(DIPOLE_EMISSION_DIPOLE_AXES), num_freqs)
expected_position_shape = (
num_position_samples,
len(DIPOLE_EMISSION_DIPOLE_AXES),
num_freqs,
)
for task_name, angle_index, polarization in self._expected_task_names():
monitor_data = monitor_data_by_task[(angle_index, polarization)]
task_label = f"{task_name}/{EMISSION_MONITOR_NAME}"
self._validate_monitor_data(monitor_data, task_label)
intensity_values = np.asarray(monitor_data.radiation_intensity.values)
self._validate_monitor_array(
intensity_values,
task_label,
"radiation_intensity",
expected_monitor_shape,
)
result_dtypes.append(intensity_values.dtype)
intensity_position_values = None
if num_position_samples:
if monitor_data.radiation_intensity_at_positions is None:
raise SetupError(
f"Dipole emission task '{task_name}' is missing position-resolved "
"radiation intensity requested by 'store_position_indexes'."
)
intensity_position_values = np.asarray(
monitor_data.radiation_intensity_at_positions.values,
)
self._validate_monitor_array(
intensity_position_values,
task_label,
"radiation_intensity_at_positions",
expected_position_shape,
)
result_dtypes.append(intensity_position_values.dtype)
monitor_records.append(
(angle_index, polarization, intensity_values, intensity_position_values)
)
result_dtype = np.result_type(*result_dtypes)
if not np.issubdtype(result_dtype, np.floating):
result_dtype = float
radiation_intensity_values = np.empty(
(len(DIPOLE_EMISSION_DIPOLE_AXES), num_pols, num_angles, num_freqs),
dtype=result_dtype,
)
position_values = None
if num_position_samples:
position_values = np.empty(
(
num_position_samples,
len(DIPOLE_EMISSION_DIPOLE_AXES),
num_pols,
num_angles,
num_freqs,
),
dtype=result_dtype,
)
for (
angle_index,
polarization,
intensity_values,
intensity_position_values,
) in monitor_records:
pol_index = pol_index_by_label[polarization]
radiation_intensity_values[:, pol_index, angle_index, :] = intensity_values
if num_position_samples:
position_values[:, :, pol_index, angle_index, :] = intensity_position_values
radiation_intensity = DipoleEmissionStudyDataArray(
radiation_intensity_values,
dims=DipoleEmissionStudyDataArray._dims,
coords=self._result_coords(),
)
radiation_intensity_at_positions = None
if num_position_samples:
radiation_intensity_at_positions = DipoleEmissionStudyPositionDataArray(
position_values,
dims=DipoleEmissionStudyPositionDataArray._dims,
coords=self._position_result_coords(),
)
return DipoleEmissionStudyData(
radiation_intensity=radiation_intensity,
radiation_intensity_at_positions=radiation_intensity_at_positions,
study=self,
)
[docs]
def to_simulations(self) -> dict[str, Simulation]:
"""Build the simulations for this emission study without submitting them.
Use this method when you want to inspect the study simulations or submit
the batch yourself. The returned simulations preserve any
diagnostic monitors from ``base_sim``; those diagnostic results are not
part of ``DipoleEmissionStudyData`` and remain available only from the raw
batch data.
Returns
-------
dict[str, Simulation]
One ordinary Tidy3D simulation per stored observation angle and far-field
polarization. The simulations are not submitted by this method.
"""
theta_values, phi_values = self._angle_values()
grid_spec = self.base_sim.grid_spec.updated_copy(
override_structures=(
*self.base_sim.grid_spec.override_structures,
self.analysis_region.mesh_override(MESH_OVERRIDE_NAME),
)
)
simulations = {}
for angle_index, (theta, phi) in enumerate(zip(theta_values, phi_values)):
for polarization in self.polarizations:
source = self._source(angle_index, theta, phi, polarization)
emission_monitor = self._emission_monitor()
simulations[self._task_name(angle_index, polarization)] = (
self.base_sim.updated_copy(
sources=(
*self.base_sim.sources,
source,
),
monitors=(*self.base_sim.monitors, emission_monitor),
grid_spec=grid_spec,
normalize_index=None,
)
)
return simulations
[docs]
def compose(self, batch_data: Any) -> Any:
"""Reduce user-managed batch data into ``DipoleEmissionStudyData``.
This method is intended for users who submit simulations from
``to_simulations(...)`` manually and want to produce the same reduced
emission result returned by ``run(...)``.
Parameters
----------
batch_data : td.web.BatchData
Batch results from simulations produced by ``to_simulations(...)``.
Data from any user monitors copied from ``base_sim`` remains in this
raw batch object and is not embedded in the returned
``DipoleEmissionStudyData``.
Returns
-------
DipoleEmissionStudyData
Serializable reduced emission data containing stored ``radiation_intensity`` with
dimensions ``("dipole_axis", "polarization", "angle", "f")``.
``radiation_intensity_transfer(...)`` normalizes ``radiation_intensity`` by
the bulk emitted power at a provided reference refractive index.
Position-resolved arrays are included only when
``store_position_indexes`` is nonempty.
"""
self._check_license()
return self._compose_from_monitor_data(self._monitor_data_from_batch_data(batch_data))
def _run_batch(
self,
folder_name: str,
batch_kwargs: dict[str, Any],
) -> Any:
"""Submit the study simulations."""
from tidy3d.web import Batch
batch_kwargs = dict(batch_kwargs)
batch_init_kwargs = {}
for key in (
"verbose",
"solver_version",
"callback_url",
"num_workers",
"reduce_simulation",
"pay_type",
"lazy",
):
if key in batch_kwargs:
batch_init_kwargs[key] = batch_kwargs.pop(key)
batch = Batch(
simulations=self.to_simulations(),
folder_name=folder_name,
**batch_init_kwargs,
)
return batch.run(**batch_kwargs)
[docs]
def run(
self,
folder_name: str = "dipole_emission",
return_batch_data: bool = False,
**batch_kwargs: Any,
) -> Any:
"""Run this emission study and return angular radiation intensity.
The study returns radiation per squared electric dipole moment, with
dipole moment expressed in C*um, summed over the sampled dipole
positions using ``position_weights``. The compact result is indexed by
Cartesian dipole orientation, far-field polarization, observation
angle, and frequency. To recover an absolute angular power density for
a physical dipole moment ``d``, multiply ``radiation_intensity`` by
``|d|**2``.
Parameters
----------
folder_name : str = "dipole_emission"
Cloud folder name used for the study batch.
return_batch_data : bool = False
If ``False``, return only the reduced ``DipoleEmissionStudyData``. If
``True``, return ``(dipole_data, batch_data)`` so copied diagnostic
monitors and raw simulation data remain available to the caller.
**batch_kwargs
Additional keyword arguments forwarded to ``td.web.Batch(...)`` for
batch construction options such as ``verbose`` and to
``td.web.Batch.run(...)`` for run options such as ``path_dir``.
Returns
-------
DipoleEmissionStudyData or tuple[DipoleEmissionStudyData, td.web.BatchData]
Reduced emission data, optionally paired with the raw batch data.
"""
self._check_license()
batch_data = self._run_batch(
folder_name=folder_name,
batch_kwargs=batch_kwargs,
)
emission_data = self.compose(batch_data)
if return_batch_data:
return emission_data, batch_data
return emission_data