Source code for tidy3d.web.api.run

from __future__ import annotations

import typing
from os import PathLike
from pathlib import Path

from tidy3d.components.types.workflow import WorkflowDataType, WorkflowType
from tidy3d.config import config
from tidy3d.log import get_logging_console
from tidy3d.web.api.autograd.autograd import run as run_autograd
from tidy3d.web.api.autograd.autograd import run_async
from tidy3d.web.api.container import DEFAULT_DATA_DIR, DEFAULT_DATA_PATH
from tidy3d.web.core.types import PayType

RunInput: typing.TypeAlias = typing.Union[
    WorkflowType,
    list["RunInput"],
    tuple["RunInput", ...],
    dict[typing.Hashable, "RunInput"],
]

RunOutput: typing.TypeAlias = typing.Union[
    WorkflowDataType,
    list["RunOutput"],
    tuple["RunOutput", ...],
    dict[typing.Hashable, "RunOutput"],
]


def _collect_by_hash(
    node: RunInput,
    found: dict[str, WorkflowType] | None = None,
) -> dict[str, WorkflowType]:
    """Traverses the structure and collects all simulations into a `{hash: sim}` mapping.
    The latest occurrence of the same hash overwrites the previous one β€” which is fine
    since identical objects share the same hash."""
    if found is None:
        found = {}
    if isinstance(node, WorkflowType):
        found[node._hash_self()] = node
        return found
    if isinstance(node, (list, tuple)):
        for v in node:
            _collect_by_hash(v, found)
        return found
    if isinstance(node, dict):
        if any(isinstance(k, WorkflowType) for k in node.keys()):
            raise ValueError("Dict keys must not be simulations.")
        for v in node.values():
            _collect_by_hash(v, found)
        return found
    raise TypeError(f"Unsupported element in container: {type(node)!r}")


def _reconstruct_by_hash(node: RunInput, h2data: dict[str, WorkflowDataType]) -> RunOutput:
    """Replaces each leaf node (simulation) with its corresponding data object.

    If a simulation appears multiple times in the input structure, the first
    occurrence reuses the same object; subsequent occurrences receive a `.copy()`
    to avoid shared-state side effects.
    """
    seen = set()

    def _recur(n: RunInput) -> RunOutput:
        if isinstance(n, WorkflowType):
            h = n._hash_self()
            data = h2data[h]
            if h in seen:
                return data.copy()
            seen.add(h)
            return data
        if isinstance(n, tuple):
            return tuple(_recur(v) for v in n)
        if isinstance(n, list):
            return [_recur(v) for v in n]
        if isinstance(n, dict):
            return {k: _recur(v) for k, v in n.items()}
        raise TypeError(f"Unsupported element in reconstruction: {type(n)!r}")

    return _recur(node)


[docs] def run( simulation: RunInput, task_name: typing.Optional[str] = None, folder_name: str = "default", path: typing.Optional[PathLike] = None, callback_url: typing.Optional[str] = None, verbose: bool = True, progress_callback_upload: typing.Optional[typing.Callable[[float], None]] = None, progress_callback_download: typing.Optional[typing.Callable[[float], None]] = None, solver_version: typing.Optional[str] = None, worker_group: typing.Optional[str] = None, simulation_type: str = "tidy3d", parent_tasks: typing.Optional[list[str]] = None, local_gradient: typing.Optional[bool] = None, max_num_adjoint_per_fwd: typing.Optional[int] = None, reduce_simulation: typing.Literal["auto", True, False] = "auto", pay_type: typing.Union[PayType, str] = PayType.AUTO, priority: typing.Optional[int] = None, max_workers: typing.Optional[int] = None, lazy: typing.Optional[bool] = None, ) -> RunOutput: """ Submit one or many simulations and return results in the same container shape. This is a convenience wrapper around the autograd runners that accepts a single :class:`WorkflowType` **or** an arbitrarily nested container of simulations (`list`, `tuple`, or `dict` values). Internally, all simulations are collected, deduplicated by object hash, executed either synchronously (single) or asynchronously (batch), and the returned data objects are reassembled to mirror the input structure. **Path behavior** - **Single simulation:** results are downloaded to ``f"{path}.hdf5"``. - **Multiple simulations:** ``path`` is treated as a **directory**, and each task will write its own results file inside that directory. **Lazy loading** - If ``lazy`` is *not* specified: single runs default to ``False`` (eager load); batch runs default to ``True`` (proxy objects that load on first access). Parameters ---------- simulation : Union[:class:`.Simulation`, :class:`.HeatSimulation`, :class:`.EMESimulation`] | list | tuple | dict A simulation or a container whose leaves are simulations. Supported containers are ``list``, ``tuple``, and ``dict`` (values only). Dict **keys must not** be simulations. task_name : Optional[str], default None Optional name for a single run. Prefixed for multiple runs. folder_name : str = "default" Folder shown on the web UI. path : Optional[PathLike] = None Output path. Interpreted as a file path for single simulations and a directory for multiple simulations. Defaults are "simulation.hdf5" (single simulation) and the current directory (multiple simulations). callback_url : Optional[str] = None Optional HTTP PUT endpoint to receive completion events. verbose : bool = True If ``True``, print status and progress; otherwise run quietly. progress_callback_upload : Optional[Callable[[float], None]] = None Callback invoked with byte counts during upload (single-run path only). progress_callback_download : Optional[Callable[[float], None]] = None Callback invoked with byte counts during download (single-run path only). solver_version : Optional[str] = None Target solver version. worker_group : Optional[str] = None Worker group to target. simulation_type : str = "tidy3d" Simulation type label passed through to the runners. parent_tasks : Optional[List[str]] = None Parent task IDs, if any. local_gradient : Optional[bool] = None Compute gradients locally (more downloads; useful for experimental features). Defaults to ``config.adjoint.local_gradient`` when not provided. Remote gradients always use server-side defaults. max_num_adjoint_per_fwd : Optional[int] = None Maximum number of adjoint simulations allowed per forward run. Defaults to ``config.adjoint.max_adjoint_per_fwd`` when not provided. reduce_simulation : {"auto", True, False} = "auto" Whether to reduce structures to the simulation domain (mode solver only). pay_type : Union[PayType, str] = PayType.AUTO Payment method selection. priority : Optional[int] = None Queue priority for vGPU (1 = lowest, 10 = highest). max_workers : Optional[int] = None Maximum parallel submissions for batch runs. ``None`` submits all at once. lazy : Optional[bool] = None Whether to load the actual data (``lazy=False``) or return a proxy that loads the data when accessed (``lazy=True``). Returns ------- RunOutput A data object (or nested container of data objects) matching the input container shape. Leaves are instances of the corresponding :class:`WorkflowDataType`. Notes ----- - Simulations are indexed by ``hash(sim)``. If the *same object* appears multiple times in the input, it is executed once and its data is reused at all positions. The *last* occurrence wins if duplicates with the same hash are encountered. - For each simulation, a mode-solver compatibility patch is applied so that the returned data exposes expected convenience attributes. - ``progress_callback_*`` are only used in the single-run code path. Raises ------ ValueError If no simulations are found in ``simulation``. TypeError If an unsupported container element is encountered, or if a dict key is a simulation object. Examples -------- Single run (eager by default):: .. code-block:: python sim_data = run(sim, task_name="wg_bend", path="out/bend") # writes: "out/bend.hdf5" Batch run with nested structure (lazy by default):: .. code-block:: python sims = { "coarse": [sim_a, sim_b], "fine": sim_c, } data = run(sims, path="out/batch_dir", max_workers=4) # 'data' mirrors 'sims' structure: # data["coarse"][0] -> data for sim_a, etc. See Also -------- tidy3d.web.api.autograd.autograd.run Underlying autograd single-run implementation. tidy3d.web.api.autograd.autograd.run_async Underlying autograd batch submission implementation. """ h2sim: dict[str, WorkflowType] = _collect_by_hash(simulation) if not h2sim: raise ValueError("No simulation data found in simulation input.") if local_gradient is None: local_gradient = bool(config.adjoint.local_gradient) if max_num_adjoint_per_fwd is None: max_num_adjoint_per_fwd = config.adjoint.max_adjoint_per_fwd key_prefix = "" if len(h2sim) == 1: if path is not None: # user may submit the same simulation multiple times and not specify an extension, but dir path if not Path(path).suffixes: path = f"{path}.hdf5" console = get_logging_console() console.log(f"Changed output path to {path}") else: path = DEFAULT_DATA_PATH h, sim = next(iter(h2sim.items())) data = { h: run_autograd( simulation=sim, task_name=task_name, folder_name=folder_name, path=path, callback_url=callback_url, verbose=verbose, progress_callback_upload=progress_callback_upload, progress_callback_download=progress_callback_download, solver_version=solver_version, worker_group=worker_group, simulation_type=simulation_type, parent_tasks=parent_tasks, local_gradient=local_gradient, max_num_adjoint_per_fwd=max_num_adjoint_per_fwd, reduce_simulation=reduce_simulation, pay_type=pay_type, priority=priority, lazy=lazy if lazy is not None else False, ) } else: key_prefix = f"{task_name}_" if task_name else "" sims = {f"{key_prefix}{h}": s for h, s in h2sim.items()} path_dir = Path(path) if path is not None else Path(DEFAULT_DATA_DIR) data = run_async( simulations=sims, folder_name=folder_name, path_dir=path_dir, callback_url=callback_url, num_workers=max_workers, verbose=verbose, simulation_type=simulation_type, solver_version=solver_version, parent_tasks=parent_tasks, local_gradient=local_gradient, max_num_adjoint_per_fwd=max_num_adjoint_per_fwd, reduce_simulation=reduce_simulation, pay_type=pay_type, priority=priority, lazy=lazy if lazy is not None else True, ) h2data: dict[str, WorkflowDataType] = {} for k, sim_data in data.items(): h = k[len(key_prefix) :] if key_prefix else k h2data[h] = sim_data return _reconstruct_by_hash(simulation, h2data)