Source code for tidy3d.web.api.mode

"""Web API for mode solver"""

from __future__ import annotations

import os
import pathlib
import tempfile
import time
from datetime import datetime
from typing import Callable, List, Optional

import pydantic.v1 as pydantic
from botocore.exceptions import ClientError
from joblib import Parallel, delayed
from rich.progress import Progress

from ...components.data.monitor_data import ModeSolverData
from ...components.eme.simulation import EMESimulation
from ...components.medium import AbstractCustomMedium
from ...components.simulation import Simulation
from ...components.types import Literal
from ...exceptions import SetupError, WebError
from ...log import get_logging_console, log
from ...plugins.mode.mode_solver import MODE_MONITOR_NAME, ModeSolver
from ...version import __version__
from ..core.core_config import get_logger_console
from ..core.environment import Env
from ..core.http_util import http
from ..core.s3utils import download_file, download_gz_file, upload_file
from ..core.task_core import Folder
from ..core.types import ResourceLifecycle, Submittable

SIMULATION_JSON = "simulation.json"
SIM_FILE_HDF5_GZ = "simulation.hdf5.gz"
MODESOLVER_API = "tidy3d/modesolver/py"
MODESOLVER_JSON = "mode_solver.json"
MODESOLVER_HDF5 = "mode_solver.hdf5"
MODESOLVER_GZ = "mode_solver.hdf5.gz"

MODESOLVER_LOG = "output/result.log"
MODESOLVER_RESULT = "output/result.hdf5"
MODESOLVER_RESULT_GZ = "output/mode_solver_data.hdf5.gz"

DEFAULT_NUM_WORKERS = 10
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_DELAY = 10  # in seconds


[docs] def run( mode_solver: ModeSolver, task_name: str = "Untitled", mode_solver_name: str = "mode_solver", folder_name: str = "Mode Solver", results_file: str = "mode_solver.hdf5", verbose: bool = True, progress_callback_upload: Callable[[float], None] = None, progress_callback_download: Callable[[float], None] = None, reduce_simulation: Literal["auto", True, False] = "auto", ) -> ModeSolverData: """Submits a :class:`.ModeSolver` to server, starts running, monitors progress, downloads, and loads results as a :class:`.ModeSolverData` object. Parameters ---------- mode_solver : :class:`.ModeSolver` Mode solver to upload to server. task_name : str = "Untitled" Name of task. mode_solver_name: str = "mode_solver" The name of the mode solver to create the in task. folder_name : str = "Mode Solver" Name of folder to store task on web UI. results_file : str = "mode_solver.hdf5" Path to download results file (.hdf5). verbose : bool = True If ``True``, will print status, otherwise, will run silently. progress_callback_upload : Callable[[float], None] = None Optional callback function called when uploading file with ``bytes_in_chunk`` as argument. progress_callback_download : Callable[[float], None] = None Optional callback function called when downloading file with ``bytes_in_chunk`` as argument. reduce_simulation : Literal["auto", True, False] = "auto" Restrict simulation to mode solver region. If "auto", then simulation is automatically restricted if it contains custom mediums. Returns ------- :class:`.ModeSolverData` Mode solver data with the calculated results. """ log_level = "DEBUG" if verbose else "INFO" if verbose: console = get_logging_console() if reduce_simulation == "auto": sim_mediums = mode_solver.simulation.scene.mediums contains_custom = any(isinstance(med, AbstractCustomMedium) for med in sim_mediums) reduce_simulation = contains_custom if reduce_simulation: log.warning( "The associated 'Simulation' object contains custom mediums. It will be " "automatically restricted to the mode solver plane to reduce data for uploading. " "To force uploading the original 'Simulation' object use 'reduce_simulation=False'." " Setting 'reduce_simulation=True' will force simulation reduction in all cases and" " silence this warning." ) if reduce_simulation: mode_solver = mode_solver.reduced_simulation_copy task = ModeSolverTask.create(mode_solver, task_name, mode_solver_name, folder_name) if verbose: console.log( f"Mode solver created with task_id='{task.task_id}', solver_id='{task.solver_id}'." ) task.upload(verbose=verbose, progress_callback=progress_callback_upload) task.submit() # Wait for task to finish prev_status = "draft" status = task.status while status not in ("success", "error", "diverged", "deleted"): if status != prev_status: log.log(log_level, f"Mode solver status: {status}") if verbose: console.log(f"Mode solver status: {status}") prev_status = status time.sleep(0.5) status = task.get_info().status if status == "error": raise WebError("Error running mode solver.") log.log(log_level, f"Mode solver status: {status}") if verbose: console.log(f"Mode solver status: {status}") if status != "success": # Our cache discards None, so the user is able to re-run return None return task.get_result( to_file=results_file, verbose=verbose, progress_callback=progress_callback_download )
[docs] def run_batch( mode_solvers: List[ModeSolver], task_name: str = "BatchModeSolver", folder_name: str = "BatchModeSolvers", results_files: List[str] = None, verbose: bool = True, max_workers: int = DEFAULT_NUM_WORKERS, max_retries: int = DEFAULT_MAX_RETRIES, retry_delay: float = DEFAULT_RETRY_DELAY, progress_callback_upload: Callable[[float], None] = None, progress_callback_download: Callable[[float], None] = None, ) -> List[ModeSolverData]: """ Submits a batch of ModeSolver to the server concurrently, manages progress, and retrieves results. Parameters ---------- mode_solvers : List[ModeSolver] List of mode solvers to be submitted to the server. task_name : str Base name for tasks. Each task in the batch will have a unique index appended to this base name. folder_name : str Name of the folder where tasks are stored on the server's web UI. results_files : List[str], optional List of file paths where the results for each ModeSolver should be downloaded. If None, a default path based on the folder name and index is used. verbose : bool If True, displays a progress bar. If False, runs silently. max_workers : int Maximum number of concurrent workers to use for processing the batch of simulations. max_retries : int Maximum number of retries for each simulation in case of failure before giving up. retry_delay : int Delay in seconds between retries when a simulation fails. progress_callback_upload : Callable[[float], None], optional Optional callback function called when uploading file with ``bytes_in_chunk`` as argument. progress_callback_download : Callable[[float], None], optional Optional callback function called when downloading file with ``bytes_in_chunk`` as argument. Returns ------- List[ModeSolverData] A list of ModeSolverData objects containing the results from each simulation in the batch. ``None`` is placed in the list for simulations that fail after all retries. """ console = get_logging_console() if not all(isinstance(x, ModeSolver) for x in mode_solvers): console.log( "Validation Error: All items in `mode_solvers` must be instances of `ModeSolver`." ) return [] num_mode_solvers = len(mode_solvers) if results_files is None: results_files = [f"mode_solver_batch_results_{i}.hdf5" for i in range(num_mode_solvers)] def handle_mode_solver(index, progress, pbar): retries = 0 while retries <= max_retries: try: result = run( mode_solver=mode_solvers[index], task_name=f"{task_name}_{index}", mode_solver_name=f"mode_solver_batch_{index}", folder_name=folder_name, results_file=results_files[index], verbose=False, progress_callback_upload=progress_callback_upload, progress_callback_download=progress_callback_download, ) if verbose: progress.update(pbar, advance=1) return result except Exception as e: console.log(f"Error in mode solver {index}: {str(e)}") if retries < max_retries: time.sleep(retry_delay) retries += 1 else: console.log(f"The {index}-th mode solver failed after {max_retries} tries.") if verbose: progress.update(pbar, advance=1) return None if verbose: console.log(f"[cyan]Running a batch of [deep_pink4]{num_mode_solvers} mode solvers.\n") # Create the common folder before running the parallel computation _ = Folder.create(folder_name=folder_name) with Progress(console=console) as progress: pbar = progress.add_task("Status:", total=num_mode_solvers) results = Parallel(n_jobs=max_workers, backend="threading")( delayed(handle_mode_solver)(i, progress, pbar) for i in range(num_mode_solvers) ) # Make sure the progress bar is complete progress.update(pbar, completed=num_mode_solvers, refresh=True) console.log("[green]A batch of `ModeSolver` tasks completed successfully!") else: results = Parallel(n_jobs=max_workers, backend="threading")( delayed(handle_mode_solver)(i, None, None) for i in range(num_mode_solvers) ) return results
[docs] class ModeSolverTask(ResourceLifecycle, Submittable, extra=pydantic.Extra.allow): """Interface for managing the running of a :class:`.ModeSolver` task on server.""" task_id: str = pydantic.Field( None, title="task_id", description="Task ID number, set when the task is created, leave as None.", alias="refId", ) solver_id: str = pydantic.Field( None, title="solver", description="Solver ID number, set when the task is created, leave as None.", alias="id", ) real_flex_unit: float = pydantic.Field( None, title="real FlexCredits", description="Billed FlexCredits.", alias="charge" ) created_at: Optional[datetime] = pydantic.Field( title="created_at", description="Time at which this task was created.", alias="createdAt" ) status: str = pydantic.Field( None, title="status", description="Mode solver task status.", ) file_type: str = pydantic.Field( None, title="file_type", description="File type used to upload the mode solver.", alias="fileType", ) mode_solver: ModeSolver = pydantic.Field( None, title="mode_solver", description="Mode solver being run by this task.", )
[docs] @classmethod def create( cls, mode_solver: ModeSolver, task_name: str = "Untitled", mode_solver_name: str = "mode_solver", folder_name: str = "Mode Solver", ) -> ModeSolverTask: """Create a new mode solver task on the server. Parameters ---------- mode_solver: :class".ModeSolver" The object that will be uploaded to server in the submitting phase. task_name: str = "Untitled" The name of the task. mode_solver_name: str = "mode_solver" The name of the mode solver to create the in task. folder_name: str = "Mode Solver" The name of the folder to store the task. Returns ------- :class:`ModeSolverTask` :class:`ModeSolverTask` object containing information about the created task. """ folder = Folder.get(folder_name, create=True) mode_solver.validate_pre_upload() if isinstance(mode_solver.simulation, Simulation): mode_solver.simulation.validate_pre_upload(source_required=False) elif isinstance(mode_solver.simulation, EMESimulation): # TODO: replace this with native web api support raise SetupError( "'EMESimulation' is not yet supported in the " "remote mode solver web api. Please instead call 'ModeSolver.to_fdtd_mode_solver' " "before using the web api; this replaces the 'EMESimulation' with a 'Simulation' " "that can be used in the remote mode solver. " "Alternatively, you can add a 'ModeSolverMonitor' to the 'EMESimulation' " "and use the EME solver web api." ) # mode_solver.simulation.validate_pre_upload() else: raise SetupError("Simulation type not supported in the remote mode solver web api.") response_body = { "projectId": folder.folder_id, "taskName": task_name, "protocolVersion": __version__, "modeSolverName": mode_solver_name, "fileType": "Gz", "source": "Python", } resp = http.post(MODESOLVER_API, response_body) # TODO: actually fix the root cause later. # sometimes POST for mode returns None and crashes the log.info call. # For now this catches it and raises a better error that we can debug. if resp is None: raise WebError( "'ModeSolver' POST request returned 'None'. If you received this error, please " "raise an issue on the Tidy3D front end GitHub repository referencing the following" f"response body '{response_body}'." ) log.info( "Mode solver created with task_id='%s', solver_id='%s'.", resp["refId"], resp["id"] ) return ModeSolverTask(**resp, mode_solver=mode_solver)
[docs] @classmethod def get( cls, task_id: str, solver_id: str, to_file: str = "mode_solver.hdf5", sim_file: str = "simulation.hdf5", verbose: bool = True, progress_callback: Callable[[float], None] = None, ) -> ModeSolverTask: """Get mode solver task from the server by id. Parameters ---------- task_id: str Unique identifier of the task on server. solver_id: str Unique identifier of the mode solver in the task. to_file: str = "mode_solver.hdf5" File to store the mode solver downloaded from the task. sim_file: str = "simulation.hdf5" File to store the simulation downloaded from the task. verbose: bool = True Whether to display progress bars. progress_callback : Callable[[float], None] = None Optional callback function called while downloading the data. Returns ------- :class:`ModeSolverTask` :class:`ModeSolverTask` object containing information about the task. """ resp = http.get(f"{MODESOLVER_API}/{task_id}/{solver_id}") task = ModeSolverTask(**resp) mode_solver = task.get_modesolver(to_file, sim_file, verbose, progress_callback) return task.copy(update={"mode_solver": mode_solver})
[docs] def get_info(self) -> ModeSolverTask: """Get the current state of this task on the server. Returns ------- :class:`ModeSolverTask` :class:`ModeSolverTask` object containing information about the task, without the mode solver. """ resp = http.get(f"{MODESOLVER_API}/{self.task_id}/{self.solver_id}") return ModeSolverTask(**resp, mode_solver=self.mode_solver)
[docs] def upload( self, verbose: bool = True, progress_callback: Callable[[float], None] = None ) -> None: """Upload this task's 'mode_solver' to the server. Parameters ---------- verbose: bool = True Whether to display progress bars. progress_callback : Callable[[float], None] = None Optional callback function called while uploading the data. """ mode_solver = self.mode_solver.copy() sim = mode_solver.simulation # Upload simulation.hdf5.gz for GUI display file, file_name = tempfile.mkstemp(".hdf5.gz") os.close(file) try: sim.to_hdf5_gz(file_name) upload_file( self.task_id, file_name, SIM_FILE_HDF5_GZ, verbose=verbose, progress_callback=progress_callback, ) finally: os.unlink(file_name) # Upload a single HDF5 file with the full data file, file_name = tempfile.mkstemp(".hdf5.gz") os.close(file) try: mode_solver.to_hdf5_gz(file_name) upload_file( self.solver_id, file_name, MODESOLVER_GZ, verbose=verbose, progress_callback=progress_callback, ) finally: os.unlink(file_name)
[docs] def submit(self): """Start the execution of this task. The mode solver must be uploaded to the server with the :meth:`ModeSolverTask.upload` method before this step. """ http.post( f"{MODESOLVER_API}/{self.task_id}/{self.solver_id}/run", {"enableCaching": Env.current.enable_caching}, )
[docs] def delete(self): """Delete the mode solver and its corresponding task from the server.""" # Delete mode solver http.delete(f"{MODESOLVER_API}/{self.task_id}/{self.solver_id}") # Delete parent task http.delete(f"tidy3d/tasks/{self.task_id}")
[docs] def abort(self): """Abort the mode solver and its corresponding task from the server.""" return http.put( "tidy3d/tasks/abort", json={"taskType": "MODE_SOLVER", "taskId": self.solver_id} )
[docs] def get_modesolver( self, to_file: str = "mode_solver.hdf5", sim_file: str = "simulation.hdf5", verbose: bool = True, progress_callback: Callable[[float], None] = None, ) -> ModeSolver: """Get mode solver associated with this task from the server. Parameters ---------- to_file: str = "mode_solver.hdf5" File to store the mode solver downloaded from the task. sim_file: str = "simulation.hdf5" File to store the simulation downloaded from the task, if any. verbose: bool = True Whether to display progress bars. progress_callback : Callable[[float], None] = None Optional callback function called while downloading the data. Returns ------- :class:`ModeSolver` :class:`ModeSolver` object associated with this task. """ if self.file_type == "Gz": file, file_path = tempfile.mkstemp(".hdf5.gz") os.close(file) try: download_file( self.solver_id, MODESOLVER_GZ, to_file=file_path, verbose=verbose, progress_callback=progress_callback, ) mode_solver = ModeSolver.from_hdf5_gz(file_path) finally: os.unlink(file_path) elif self.file_type == "Hdf5": file, file_path = tempfile.mkstemp(".hdf5") os.close(file) try: download_file( self.solver_id, MODESOLVER_HDF5, to_file=file_path, verbose=verbose, progress_callback=progress_callback, ) mode_solver = ModeSolver.from_hdf5(file_path) finally: os.unlink(file_path) else: file, file_path = tempfile.mkstemp(".json") os.close(file) try: download_file( self.solver_id, MODESOLVER_JSON, to_file=file_path, verbose=verbose, progress_callback=progress_callback, ) mode_solver_dict = ModeSolver.dict_from_json(file_path) finally: os.unlink(file_path) download_file( self.task_id, SIMULATION_JSON, to_file=sim_file, verbose=verbose, progress_callback=progress_callback, ) mode_solver_dict["simulation"] = Simulation.from_json(sim_file) mode_solver = ModeSolver.parse_obj(mode_solver_dict) # Store requested mode solver file mode_solver.to_file(to_file) return mode_solver
[docs] def get_result( self, to_file: str = "mode_solver_data.hdf5", verbose: bool = True, progress_callback: Callable[[float], None] = None, ) -> ModeSolverData: """Get mode solver results for this task from the server. Parameters ---------- to_file: str = "mode_solver_data.hdf5" File to store the mode solver downloaded from the task. verbose: bool = True Whether to display progress bars. progress_callback : Callable[[float], None] = None Optional callback function called while downloading the data. Returns ------- :class:`.ModeSolverData` Mode solver data with the calculated results. """ file = None try: file = download_gz_file( resource_id=self.solver_id, remote_filename=MODESOLVER_RESULT_GZ, to_file=to_file, verbose=verbose, progress_callback=progress_callback, ) except ClientError: if verbose: console = get_logger_console() console.log(f"Unable to download '{MODESOLVER_RESULT_GZ}'.") if not file: try: file = download_file( resource_id=self.solver_id, remote_filename=MODESOLVER_RESULT, to_file=to_file, verbose=verbose, progress_callback=progress_callback, ) except Exception as e: raise WebError( "Failed to download the simulation data file from the server. " "Please confirm that the task was successfully run." ) from e data = ModeSolverData.from_hdf5(to_file) data = data.copy( update={"monitor": self.mode_solver.to_mode_solver_monitor(name=MODE_MONITOR_NAME)} ) self.mode_solver._cached_properties["data_raw"] = data # Perform symmetry expansion self.mode_solver._cached_properties.pop("data", None) return self.mode_solver.data
[docs] def get_log( self, to_file: str = "mode_solver.log", verbose: bool = True, progress_callback: Callable[[float], None] = None, ) -> pathlib.Path: """Get execution log for this task from the server. Parameters ---------- to_file: str = "mode_solver.log" File to store the mode solver downloaded from the task. verbose: bool = True Whether to display progress bars. progress_callback : Callable[[float], None] = None Optional callback function called while downloading the data. Returns ------- path: pathlib.Path Path to saved file. """ return download_file( self.solver_id, MODESOLVER_LOG, to_file=to_file, verbose=verbose, progress_callback=progress_callback, )