"""Provides lowest level, user-facing interface to server."""
from __future__ import annotations
import json
import tempfile
import time
from os import PathLike
from pathlib import Path
from typing import Callable, Literal, Optional, Union
from requests import HTTPError
from rich.progress import BarColumn, Progress, TaskProgressColumn, TextColumn, TimeElapsedColumn
from tidy3d.components.medium import AbstractCustomMedium
from tidy3d.components.mode.mode_solver import ModeSolver
from tidy3d.components.mode.simulation import ModeSimulation
from tidy3d.components.types.workflow import WorkflowDataType, WorkflowType
from tidy3d.config import config
from tidy3d.exceptions import WebError
from tidy3d.log import get_logging_console, log
from tidy3d.web.api.states import (
ALL_POST_VALIDATE_STATES,
END_STATES,
ERROR_STATES,
MAX_STEPS,
STATE_PROGRESS_PERCENTAGE,
status_to_stage,
)
from tidy3d.web.cache import CacheEntry, _store_mode_solver_in_cache, resolve_local_cache
from tidy3d.web.core.account import Account
from tidy3d.web.core.constants import (
CM_DATA_HDF5_GZ,
MODE_DATA_HDF5_GZ,
MODE_FILE_HDF5_GZ,
MODELER_FILE_HDF5_GZ,
SIM_FILE_HDF5,
SIM_FILE_HDF5_GZ,
SIMULATION_DATA_HDF5_GZ,
TaskId,
)
from tidy3d.web.core.task_core import (
BatchDetail,
BatchTask,
Folder,
SimulationTask,
TaskFactory,
WebTask,
)
from tidy3d.web.core.task_info import ChargeType, TaskInfo
from tidy3d.web.core.types import PayType, TaskType
from .connect_util import REFRESH_TIME, get_grid_points_str, get_time_steps_str, wait_for_connection
from .tidy3d_stub import Tidy3dStub, Tidy3dStubData
# time between checking run status
RUN_REFRESH_TIME = 1.0
# file names when uploading to S3
SIM_FILE_JSON = "simulation.json"
# not all solvers are supported yet in GUI
GUI_SUPPORTED_TASK_TYPES = ["FDTD", "MODE_SOLVER", "HEAT", "TERMINAL_CM"]
# if a solver is in beta stage, cost is subject to change
BETA_TASK_TYPES = ["HEAT", "EME", "HEAT_CHARGE", "VOLUME_MESH"]
# map task_type to solver name for display
SOLVER_NAME = {
"FDTD": "FDTD",
"MODE_SOLVER": "Mode",
"MODE": "Mode",
"EME": "EME",
"HEAT": "Heat",
"HEAT_CHARGE": "HeatCharge",
"VOLUME_MESH": "VolumeMesher",
}
def _get_url(task_id: str) -> str:
"""Get the URL for a task on our server."""
return _build_website_url(f"workbench?taskId={task_id}")
def _get_folder_url(folder_id: str) -> str:
"""Get the URL for a task folder on our server."""
return _build_website_url(f"folders/{folder_id}")
def _get_url_rf(resource_id: str) -> str:
"""Get the RF GUI URL for a modeler/batch group."""
return _build_website_url(f"rf?taskId={resource_id}")
def _build_website_url(path: str) -> str:
base = str(config.web.website_endpoint or "")
if not path:
return base
return "/".join([base.rstrip("/"), str(path).lstrip("/")])
def _batch_detail_error(resource_id: str) -> Optional[WebError]:
"""Processes a failed batch job to generate a detailed error.
This function inspects the status of a batch detail object. If the status
indicates an error, it logs the failure and constructs a specific `WebError`
object to be returned. For validation failures, it parses and aggregates
detailed error messages from each subtask.
Args:
resource_id (str): The identifier of the batch resource that failed.
Raises:
An instance of ``WebError`` if the batch failed.
"""
# TODO: test properly
try:
batch = BatchTask.get(resource_id)
batch_detail = batch.detail()
status = batch_detail.status.lower()
except Exception as e:
log.error(f"Could not retrieve batch details for '{resource_id}': {e}")
raise WebError(f"Failed to retrieve status for batch '{resource_id}'.") from e
if status not in ERROR_STATES:
return
if hasattr(batch_detail, "validateErrors") and batch_detail.validateErrors:
try:
error_details = []
for key, error_str in batch_detail.validateErrors.items():
msg = f"- Subtask '{key}' failed: {error_str}"
error_details.append(msg)
details_string = "\n".join(error_details)
full_error_msg = (
"One or more subtasks failed validation. Please fix the component modeler "
"configuration.\n"
f"Details:\n{details_string}"
)
except Exception as e:
raise WebError(
"One or more subtasks failed validation. Failed to parse validation errors."
) from e
raise WebError(full_error_msg)
# Handle all other generic error states
else:
error_msg = (
f"Batch '{resource_id}' failed with status '{status}'. Check server "
"logs for details or contact customer support."
)
raise WebError(error_msg)
def _copy_simulation_data_from_cache_entry(entry: CacheEntry, path: PathLike) -> bool:
"""
Copy cached simulation data from a cache entry to a specified path.
Parameters
----------
entry : CacheEntry
The cache entry containing simulation data and metadata.
path : PathLike
The target directory or file path where the cached data should be materialized.
Returns
-------
bool
True if the cached simulation data was successfully copied, False otherwise.
"""
if entry is not None:
try:
entry.materialize(Path(path))
return True
except Exception:
return False
return False
def restore_simulation_if_cached(
simulation: WorkflowType,
path: Optional[PathLike] = None,
reduce_simulation: Literal["auto", True, False] = "auto",
verbose: bool = True,
) -> tuple[Optional[PathLike], Optional[TaskId]]:
"""
Attempt to restore simulation data from a local cache entry, if available.
Parameters
----------
simulation : WorkflowType
The simulation or workflow object for which cached data may exist.
path : Optional[PathLike] = None
Optional path where the cached data should be copied. If not provided,
the path from the cache entry will be used.
reduce_simulation : Literal["auto", True, False] = "auto"
Whether to reduce the simulation for cache lookup. If "auto", reduction is applied
only when applicable (e.g., for mode solvers).
verbose : bool = True
If True, logs a message including a link to the cached task in the web UI.
Returns
-------
Optional[PathLike]
The path to the restored simulation data if found in cache, otherwise None. If no path is specified, the cache entry path is returned, otherwise the given path is returned.
Optional[TaskId]
The original task id of the restored simulation data.
"""
simulation_cache = resolve_local_cache()
retrieved_simulation_path = None
cached_task_id = None
if simulation_cache is not None:
sim_for_cache = simulation
if isinstance(simulation, (ModeSolver, ModeSimulation)):
sim_for_cache = get_reduced_simulation(simulation, reduce_simulation)
entry = simulation_cache.try_fetch(simulation=sim_for_cache, verbose=verbose)
if entry is not None:
if path is not None:
copied = _copy_simulation_data_from_cache_entry(entry, path)
if copied:
retrieved_simulation_path = path
else:
retrieved_simulation_path = entry.artifact_path
cached_task_id = entry.metadata.get("task_id")
cached_workflow_type = entry.metadata.get("workflow_type")
if cached_task_id is not None and cached_workflow_type is not None and verbose:
console = get_logging_console()
url, _ = _get_task_urls(cached_workflow_type, cached_task_id)
console.log(
f"Loading simulation from local cache. View cached task using web UI at [link={url}]'{url}'[/link]."
)
return retrieved_simulation_path, cached_task_id
def load_simulation_if_cached(
simulation: WorkflowType,
path: Optional[PathLike] = None,
reduce_simulation: Literal["auto", True, False] = "auto",
verbose: bool = True,
) -> Optional[WorkflowDataType]:
"""
Load simulation results directly from the local cache, if available.
Parameters
----------
simulation : WorkflowType
The simulation or workflow object to check for cached results.
path : Optional[PathLike] = None
Optional path to which cached data should be restored before loading.
reduce_simulation : Literal["auto", True, False] = "auto"
Whether to use a reduced simulation when checking the cache. If "auto",
reduction is applied automatically for mode solvers.
verbose : bool = True
If True, logs a message including a link to the cached task in the web UI on loading.
Returns
-------
Optional[WorkflowDataType]
The loaded simulation data if found in cache, otherwise None.
"""
restored_path, _ = restore_simulation_if_cached(
simulation, path, reduce_simulation, verbose=verbose
)
if restored_path is not None:
data = load(
task_id=None,
path=str(restored_path),
verbose=verbose,
)
if isinstance(simulation, ModeSolver):
simulation._patch_data(data=data)
return data
else:
return None
@wait_for_connection
def run(
simulation: WorkflowType,
task_name: Optional[str] = None,
folder_name: str = "default",
path: PathLike = "simulation_data.hdf5",
callback_url: Optional[str] = None,
verbose: bool = True,
progress_callback_upload: Optional[Callable[[float], None]] = None,
progress_callback_download: Optional[Callable[[float], None]] = None,
solver_version: Optional[str] = None,
worker_group: Optional[str] = None,
simulation_type: str = "tidy3d",
parent_tasks: Optional[list[str]] = None,
reduce_simulation: Literal["auto", True, False] = "auto",
pay_type: Union[PayType, str] = PayType.AUTO,
priority: Optional[int] = None,
lazy: bool = False,
) -> WorkflowDataType:
"""
Submits a :class:`.Simulation` to server, starts running, monitors progress, downloads,
and loads results as a :class:`.WorkflowDataType` object.
Parameters
----------
simulation : Union[:class:`.Simulation`, :class:`.HeatSimulation`, :class:`.EMESimulation`]
Simulation to upload to server.
task_name : Optional[str] = None
Name of task. If not provided, a default name will be generated.
folder_name : str = "default"
Name of folder to store task on web UI.
path : PathLike = "simulation_data.hdf5"
Path to download results file (.hdf5), including filename.
callback_url : str = None
Http PUT url to receive simulation finish event. The body content is a json file with
fields ``{'id', 'status', 'name', 'workUnit', 'solverVersion'}``.
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
simulation_type : str = "tidy3d"
Type of simulation being uploaded.
progress_callback_upload : Callable[[float], None] = None
Optional callback function called when uploading file with ``bytes_in_chunk`` as argument.
progress_callback_download : Callable[[float], None] = None
Optional callback function called when downloading file with ``bytes_in_chunk`` as argument.
solver_version: str = None
target solver version.
worker_group: str = None
worker group
reduce_simulation : Literal["auto", True, False] = "auto"
Whether to reduce structures in the simulation to the simulation domain only. Note: currently only implemented for the mode solver.
pay_type: Union[PayType, str] = PayType.AUTO
Which method to pay the simulation.
priority: int = None
Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest).
It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits.
lazy : bool = False
Whether to load the actual data (``lazy=False``) or return a proxy that loads
the data when accessed (``lazy=True``).
Returns
-------
Union[:class:`.SimulationData`, :class:`.HeatSimulationData`, :class:`.EMESimulationData`]
Object containing solver results for the supplied simulation.
Notes
-----
Submitting a simulation to our cloud server is very easily done by a simple web API call.
.. code-block:: python
sim_data = tidy3d.web.api.webapi.run(simulation, task_name='my_task', path='out/data.hdf5')
The :meth:`tidy3d.web.api.webapi.run()` method shows the simulation progress by default. When uploading a
simulation to the server without running it, you can use the :meth:`tidy3d.web.api.webapi.monitor`,
:meth:`tidy3d.web.api.container.Job.monitor`, or :meth:`tidy3d.web.api.container.Batch.monitor` methods to
display the progress of your simulation(s).
Examples
--------
To access the original :class:`.Simulation` object that created the simulation data you can use:
.. code-block:: python
# Run the simulation.
sim_data = web.run(simulation, task_name='task_name', path='out/sim.hdf5')
# Get a copy of the original simulation object.
sim_copy = sim_data.simulation
See Also
--------
:meth:`tidy3d.web.api.webapi.monitor`
Print the real time task progress until completion.
:meth:`tidy3d.web.api.container.Job.monitor`
Monitor progress of running :class:`Job`.
:meth:`tidy3d.web.api.container.Batch.monitor`
Monitor progress of each of the running tasks.
"""
restored_path, _ = restore_simulation_if_cached(
simulation=simulation,
path=path,
reduce_simulation=reduce_simulation,
verbose=verbose,
)
if not restored_path:
task_id = upload(
simulation=simulation,
task_name=task_name,
folder_name=folder_name,
callback_url=callback_url,
verbose=verbose,
progress_callback=progress_callback_upload,
simulation_type=simulation_type,
parent_tasks=parent_tasks,
solver_version=solver_version,
reduce_simulation=reduce_simulation,
)
start(
task_id,
solver_version=solver_version,
worker_group=worker_group,
pay_type=pay_type,
priority=priority,
)
monitor(task_id, verbose=verbose)
else:
task_id = None
data = load(
task_id=task_id,
path=path,
verbose=verbose,
progress_callback=progress_callback_download,
lazy=lazy,
)
if isinstance(simulation, ModeSolver):
if task_id is not None:
_store_mode_solver_in_cache(task_id, simulation, data, path)
simulation._patch_data(data=data)
return data
def _get_task_urls(
task_type: str,
resource_id: str,
folder_id: Optional[str] = None,
group_id: Optional[str] = None,
) -> tuple[str, Optional[str]]:
"""Log task and folder links to the web UI."""
if task_type in ["RF", "TERMINAL_CM", "MODAL_CM"]:
url = _get_url_rf(group_id or resource_id)
else:
url = _get_url(resource_id)
if folder_id is not None:
folder_url = _get_folder_url(folder_id)
else:
folder_url = None
return url, folder_url
[docs]
@wait_for_connection
def upload(
simulation: WorkflowType,
task_name: Optional[str] = None,
folder_name: str = "default",
callback_url: Optional[str] = None,
verbose: bool = True,
progress_callback: Optional[Callable[[float], None]] = None,
simulation_type: str = "tidy3d",
parent_tasks: Optional[list[str]] = None,
source_required: bool = True,
solver_version: Optional[str] = None,
reduce_simulation: Literal["auto", True, False] = "auto",
) -> TaskId:
"""
Upload simulation to server, but do not start running :class:`.Simulation`.
Parameters
----------
simulation : Union[:class:`.Simulation`, :class:`.HeatSimulation`, :class:`.EMESimulation`]
Simulation to upload to server.
task_name : Optional[str]
Name of task. If not provided, a default name will be generated.
folder_name : str
Name of folder to store task on web UI
callback_url : str = None
Http PUT url to receive simulation finish event. The body content is a json file with
fields ``{'id', 'status', 'name', 'workUnit', 'solverVersion'}``.
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
progress_callback : Callable[[float], None] = None
Optional callback function called when uploading file with ``bytes_in_chunk`` as argument.
simulation_type : str = "tidy3d"
Type of simulation being uploaded.
parent_tasks : List[str]
List of related task ids.
source_required: bool = True
If ``True``, simulations without sources will raise an error before being uploaded.
solver_version: str = None
target solver version.
reduce_simulation: Literal["auto", True, False] = "auto"
Whether to reduce structures in the simulation to the simulation domain only. Note: currently only implemented for the mode solver.
Returns
-------
str
Unique identifier of task on server.
Notes
-----
Once you've created a ``job`` object using :class:`tidy3d.web.api.container.Job`, you can upload it to our servers with:
.. code-block:: python
web.upload(simulation, task_name="task_name", verbose=verbose)
It will not run until you explicitly tell it to do so with :meth:`tidy3d.web.api.webapi.start`.
"""
console = get_logging_console() if verbose else None
if isinstance(simulation, (ModeSolver, ModeSimulation)):
simulation = get_reduced_simulation(simulation, reduce_simulation)
stub = Tidy3dStub(simulation=simulation)
stub.validate_pre_upload(source_required=source_required)
log.debug("Creating task.")
if task_name is None:
task_name = stub.get_default_task_name()
task_type = stub.get_type()
task = WebTask.create(
task_type,
task_name,
folder_name,
callback_url,
simulation_type,
parent_tasks,
"Gz",
)
group_id = getattr(task, "groupId", None)
resource_id = task.task_id
if verbose:
console.log(
f"Created task '{task_name}' with resource_id '{resource_id}' and task_type '{task_type}'."
)
if task_type in BETA_TASK_TYPES:
solver_name = SOLVER_NAME[task_type]
console.log(
f"Tidy3D's {solver_name} solver is currently in the beta stage. "
f"Cost of {solver_name} simulations is subject to change in the future."
)
if task_type in GUI_SUPPORTED_TASK_TYPES:
url, folder_url = _get_task_urls(task_type, resource_id, task.folder_id, group_id)
console.log(f"View task using web UI at [link={url}]'{url}'[/link].")
console.log(f"Task folder: [link={folder_url}]'{task.folder_name}'[/link].")
remote_sim_file = SIM_FILE_HDF5_GZ
if task_type == "MODE_SOLVER":
remote_sim_file = MODE_FILE_HDF5_GZ
elif task_type in ["RF", "TERMINAL_CM", "MODAL_CM"]:
remote_sim_file = MODELER_FILE_HDF5_GZ
task.upload_simulation(
stub=stub,
verbose=verbose,
progress_callback=progress_callback,
remote_sim_file=remote_sim_file,
)
estimate_cost(task_id=resource_id, solver_version=solver_version, verbose=verbose)
task.validate_post_upload(parent_tasks=parent_tasks)
return resource_id
def get_reduced_simulation(
simulation: WorkflowType, reduce_simulation: Literal["auto", True, False]
) -> WorkflowType:
"""
Adjust the given simulation object based on the reduce_simulation parameter. Currently only
implemented for the mode solver.
Parameters
----------
simulation : Simulation
The simulation object to be potentially reduced.
reduce_simulation : Literal["auto", True, False]
Determines whether to reduce the simulation. If "auto", the function will decide based on
the presence of custom mediums in the simulation.
Returns
-------
Simulation
The potentially reduced simulation object.
"""
"""
TODO: This only works for the mode solver, which is also why `simulation.simulation.scene` is
used below. After refactor to use the new ModeSimulation, it should be possible to put the call
to this function outside of the MODE_SOLVER check in the upload function. We could implement
dummy `reduced_simulation_copy` methods for the other solvers or also implement reductions
there. Note that if we do the latter we may want to also modify the warning below to only
happen if there are custom media *and* they extend beyond the simulation domain.
"""
if reduce_simulation == "auto":
if isinstance(simulation, ModeSimulation):
sim_mediums = simulation.scene.mediums
else:
sim_mediums = simulation.simulation.scene.mediums
contains_custom = any(isinstance(med, AbstractCustomMedium) for med in sim_mediums)
reduce_simulation = contains_custom
if reduce_simulation:
log.warning(
f"The {type(simulation)} object contains custom mediums. It will be "
"automatically restricted to the solver domain to reduce data for uploading. "
"To force uploading the original object use 'reduce_simulation=False'."
" Setting 'reduce_simulation=True' will force simulation reduction in all cases and"
" silence this warning."
)
if reduce_simulation:
return simulation.reduced_simulation_copy
return simulation
[docs]
@wait_for_connection
def get_info(task_id: TaskId, verbose: bool = True) -> TaskInfo | BatchDetail:
"""Return information about a simulation task or a modeler batch.
This function fetches details for a given task ID, automatically
distinguishing between a standard simulation task and a modeler batch.
Parameters
----------
task_id : TaskId
The unique identifier for the task or batch.
verbose : bool = True
If ``True`` (default), display progress bars and status updates.
If ``False``, the function runs silently.
Returns
-------
TaskInfo | BatchDetail
A ``TaskInfo`` object for a standard simulation task, or a
``BatchDetail`` object for a modeler batch.
Raises
------
ValueError
If no task is found for the given ``task_id``.
"""
task = TaskFactory.get(task_id, verbose=verbose)
if not task:
raise ValueError("Task not found.")
return task.detail()
[docs]
@wait_for_connection
def start(
task_id: TaskId,
solver_version: Optional[str] = None,
worker_group: Optional[str] = None,
pay_type: Union[PayType, str] = PayType.AUTO,
priority: Optional[int] = None,
) -> None:
"""Start running the simulation associated with task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
verbose : bool = True
If ``True``, will print log messages, otherwise, will run silently.
solver_version: str = None
target solver version.
worker_group: str = None
worker group
pay_type: Union[PayType, str] = PayType.AUTO
Which method to pay the simulation
priority: int = None
Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest).
It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits.
Note
----
To monitor progress, can call :meth:`monitor` after starting simulation.
"""
if priority is not None and (priority < 1 or priority > 10):
raise ValueError("Priority must be between '1' and '10' if specified.")
task = TaskFactory.get(task_id)
if not task:
raise ValueError("Task not found.")
task.submit(
solver_version=solver_version,
worker_group=worker_group,
pay_type=pay_type,
priority=priority,
)
@wait_for_connection
def get_run_info(task_id: TaskId) -> tuple[Optional[float], Optional[float]]:
"""Gets the % done and field_decay for a running task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
Returns
-------
perc_done : float
Percentage of run done (in terms of max number of time steps).
Is ``None`` if run info not available.
field_decay : float
Average field intensity normalized to max value (1.0).
Is ``None`` if run info not available.
"""
task = TaskFactory.get(task_id)
if isinstance(task, BatchTask):
raise NotImplementedError("Operation not implemented for modeler batches.")
return task.get_running_info()
def _get_batch_detail_handle_error_status(batch: BatchTask) -> BatchDetail:
"""Get batch detail and raise error if status is in ERROR_STATES."""
detail = batch.detail()
status = detail.status.lower()
if status in ERROR_STATES:
_batch_detail_error(batch.task_id)
return detail
def get_status(task_id: TaskId) -> str:
"""Get the status of a task. Raises an error if status is "error".
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
"""
task = TaskFactory.get(task_id)
if isinstance(task, BatchTask):
return _get_batch_detail_handle_error_status(task).status
else:
task_info = get_info(task_id)
status = task_info.status
if status == "visualize":
return "success"
if status in ERROR_STATES:
try:
# Try to obtain the error message
task = SimulationTask(taskId=task_id)
with tempfile.NamedTemporaryFile(suffix=".json") as tmp_file:
task.get_error_json(to_file=tmp_file.name)
with open(tmp_file.name) as f:
error_content = json.load(f)
error_msg = error_content["msg"]
except Exception:
# If the error message could not be obtained, raise a generic error message
error_msg = "Error message could not be obtained, please contact customer support."
raise WebError(f"Error running task {task_id}! {error_msg}")
return status
[docs]
def monitor(task_id: TaskId, verbose: bool = True, worker_group: Optional[str] = None) -> None:
"""
Print the real time task progress until completion.
Notes
-----
To monitor the simulation's progress and wait for its completion, use:
.. code-block:: python
tidy3d.web.api.webapi.monitor(job.task_id, verbose=verbose).
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
Note
----
To load results when finished, may call :meth:`load`.
"""
# Batch/modeler monitoring path
task = TaskFactory.get(task_id)
if isinstance(task, BatchTask):
return _monitor_modeler_batch(task_id, verbose=verbose)
console = get_logging_console() if verbose else None
task_info = get_info(task_id)
task_name = task_info.taskName
task_type = task_info.taskType
def get_estimated_cost() -> float:
"""Get estimated cost, if None, is not ready."""
task_info = get_info(task_id)
block_info = task_info.taskBlockInfo
if block_info and block_info.chargeType == ChargeType.FREE:
est_flex_unit = 0
grid_points = block_info.maxGridPoints
time_steps = block_info.maxTimeSteps
grid_points_str = get_grid_points_str(grid_points)
time_steps_str = get_time_steps_str(time_steps)
console.log(
f"You are running this simulation for FREE. Your current plan allows"
f" up to {block_info.maxFreeCount} free non-concurrent simulations per"
f" day (under {grid_points_str} grid points and {time_steps_str}"
f" time steps)"
)
else:
est_flex_unit = task_info.estFlexUnit
return est_flex_unit
def monitor_preprocess() -> None:
"""Periodically check the status."""
status = get_status(task_id)
while status not in END_STATES and status != "running":
new_status = get_status(task_id)
if new_status != status:
status = new_status
if verbose and status != "running":
console.log(f"status = {status}")
time.sleep(REFRESH_TIME)
status = get_status(task_id)
if verbose:
console.log(f"status = {status}")
# already done
if status in END_STATES:
return
# preprocessing
if verbose:
console.log(
"To cancel the simulation, use 'web.abort(task_id)' or 'web.delete(task_id)' "
"or abort/delete the task in the web "
"UI. Terminating the Python script will not stop the job running on the cloud."
)
with console.status(f"[bold green]Waiting for '{task_name}'...", spinner="runner"):
monitor_preprocess()
else:
monitor_preprocess()
# if the estimated cost is ready, print it
if verbose:
get_estimated_cost()
console.log("starting up solver")
# while running but before the percentage done is available, keep waiting
while get_run_info(task_id)[0] is None and get_status(task_id) == "running":
time.sleep(REFRESH_TIME)
# while running but percentage done is available
if verbose:
# verbose case, update progressbar
console.log("running solver")
if "FDTD" in task_type:
with Progress(console=console) as progress:
pbar_pd = progress.add_task("% done", total=100)
perc_done, _ = get_run_info(task_id)
while (
perc_done is not None and perc_done < 100 and get_status(task_id) == "running"
):
perc_done, field_decay = get_run_info(task_id)
new_description = f"solver progress (field decay = {field_decay:.2e})"
progress.update(pbar_pd, completed=perc_done, description=new_description)
time.sleep(RUN_REFRESH_TIME)
perc_done, field_decay = get_run_info(task_id)
if perc_done is not None and perc_done < 100 and field_decay > 0:
console.log(f"early shutoff detected at {perc_done:1.0f}%, exiting.")
new_description = f"solver progress (field decay = {field_decay:.2e})"
progress.update(pbar_pd, completed=100, refresh=True, description=new_description)
elif task_type == "EME":
with Progress(console=console) as progress:
pbar_pd = progress.add_task("% done", total=100)
perc_done, _ = get_run_info(task_id)
while (
perc_done is not None and perc_done < 100 and get_status(task_id) == "running"
):
perc_done, _ = get_run_info(task_id)
new_description = "solver progress"
progress.update(pbar_pd, completed=perc_done, description=new_description)
time.sleep(RUN_REFRESH_TIME)
perc_done, _ = get_run_info(task_id)
new_description = "solver progress"
progress.update(pbar_pd, completed=100, refresh=True, description=new_description)
else:
while get_status(task_id) == "running":
perc_done, _ = get_run_info(task_id)
time.sleep(RUN_REFRESH_TIME)
else:
# non-verbose case, just keep checking until status is not running or perc_done >= 100
perc_done, _ = get_run_info(task_id)
while perc_done is not None and perc_done < 100 and get_status(task_id) == "running":
perc_done, field_decay = get_run_info(task_id)
time.sleep(RUN_REFRESH_TIME)
# post processing
if verbose:
status = get_status(task_id)
if status != "running":
console.log(f"status = {status}")
with console.status(f"[bold green]Finishing '{task_name}'...", spinner="runner"):
while status not in END_STATES:
new_status = get_status(task_id)
if new_status != status:
status = new_status
console.log(f"status = {status}")
time.sleep(REFRESH_TIME)
if task_type in GUI_SUPPORTED_TASK_TYPES:
url = _get_url(task_id)
console.log(f"View simulation result at [blue underline][link={url}]'{url}'[/link].")
else:
while get_status(task_id) not in END_STATES:
time.sleep(REFRESH_TIME)
[docs]
@wait_for_connection
def abort(task_id: TaskId) -> Optional[TaskInfo]:
"""Abort server-side data associated with task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
Returns
-------
TaskInfo
Object containing information about status, size, credits of task.
"""
console = get_logging_console()
task = TaskFactory.get(task_id, verbose=False)
if not task:
return None
url = task.get_url()
task.abort()
console.log(
f"Task is aborting. View task using web UI at [link={url}]'{url}'[/link] to check the result."
)
return TaskInfo(
**{"taskId": task_id, "taskType": getattr(task, "task_type", None), **task.dict()}
)
[docs]
@wait_for_connection
def download(
task_id: TaskId,
path: PathLike = "simulation_data.hdf5",
verbose: bool = True,
progress_callback: Optional[Callable[[float], None]] = None,
) -> None:
"""Download results of task to file.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
path : PathLike = "simulation_data.hdf5"
Download path to .hdf5 data file (including filename).
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
progress_callback : Callable[[float], None] = None
Optional callback function called when downloading file with ``bytes_in_chunk`` as argument.
"""
path = Path(path)
task = TaskFactory.get(task_id, verbose=False)
if isinstance(task, BatchTask):
if path.name == "simulation_data.hdf5":
path = path.with_name("cm_data.hdf5")
task.get_data_hdf5(
to_file=path,
remote_data_file_gz=CM_DATA_HDF5_GZ,
verbose=verbose,
progress_callback=progress_callback,
)
return
info = get_info(task_id, verbose=False)
remote_data_file = SIMULATION_DATA_HDF5_GZ
if info.taskType == "MODE_SOLVER":
remote_data_file = MODE_DATA_HDF5_GZ
task.get_data_hdf5(
to_file=path,
remote_data_file_gz=remote_data_file,
verbose=verbose,
progress_callback=progress_callback,
)
[docs]
@wait_for_connection
def download_json(task_id: TaskId, path: PathLike = SIM_FILE_JSON, verbose: bool = True) -> None:
"""Download the ``.json`` file associated with the :class:`.Simulation` of a given task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
path : PathLike = "simulation.json"
Download path to .json file of simulation (including filename).
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
"""
task = TaskFactory.get(task_id, verbose=False)
if isinstance(task, BatchTask):
raise NotImplementedError("Operation not implemented for modeler batches.")
task.get_simulation_json(path, verbose=verbose)
[docs]
@wait_for_connection
def delete_old(days_old: int, folder_name: str = "default") -> int:
"""Remove folder contents older than ``days_old``."""
folder = Folder.get(folder_name, create=True)
return folder.delete_old(days_old)
[docs]
@wait_for_connection
def load_simulation(
task_id: TaskId, path: PathLike = SIM_FILE_JSON, verbose: bool = True
) -> WorkflowType:
"""Download the ``.json`` file of a task and load the associated simulation.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
path : PathLike = "simulation.json"
Download path to .json or .hdf5 file of simulation (including filename).
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
Returns
-------
Union[:class:`.Simulation`, :class:`.HeatSimulation`, :class:`.EMESimulation`]
Simulation loaded from downloaded json file.
"""
task = TaskFactory.get(task_id, verbose=False)
if isinstance(task, BatchTask):
raise NotImplementedError("Operation not implemented for modeler batches.")
path = Path(path)
if path.suffix == ".json":
task.get_simulation_json(path, verbose=verbose)
elif path.suffix == ".hdf5":
task.get_simulation_hdf5(path, verbose=verbose)
else:
raise ValueError("Path suffix must be '.json' or '.hdf5'")
return Tidy3dStub.from_file(path)
[docs]
@wait_for_connection
def download_log(
task_id: TaskId,
path: PathLike = "tidy3d.log",
verbose: bool = True,
progress_callback: Optional[Callable[[float], None]] = None,
) -> None:
"""Download the tidy3d log file associated with a task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
path : PathLike = "tidy3d.log"
Download path to log file (including filename).
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
progress_callback : Callable[[float], None] = None
Optional callback function called when downloading file with ``bytes_in_chunk`` as argument.
Note
----
To load downloaded results into data, call :meth:`load` with option ``replace_existing=False``.
"""
task = TaskFactory.get(task_id, verbose=False)
if isinstance(task, BatchTask):
raise NotImplementedError("Operation not implemented for modeler batches.")
task.get_log(path, verbose=verbose, progress_callback=progress_callback)
[docs]
@wait_for_connection
def load(
task_id: Optional[TaskId],
path: PathLike = "simulation_data.hdf5",
replace_existing: bool = True,
verbose: bool = True,
progress_callback: Optional[Callable[[float], None]] = None,
lazy: bool = False,
) -> WorkflowDataType:
"""
Download and Load simulation results into :class:`.SimulationData` object.
Notes
-----
After the simulation is complete, you can load the results into a :class:`.SimulationData` object by its
``task_id`` using:
.. code-block:: python
sim_data = web.load(task_id, path="outt/sim.hdf5", verbose=verbose)
The :meth:`tidy3d.web.api.webapi.load` method is very convenient to load and postprocess results from simulations
created using Tidy3D GUI.
Parameters
----------
task_id : Optional[str] = None
Unique identifier of task on server. Returned by :meth:`upload`. If None, file is assumed to exist already from cache.
path : PathLike
Download path to .hdf5 data file (including filename).
replace_existing : bool = True
Downloads the data even if path exists (overwriting the existing).
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
progress_callback : Callable[[float], None] = None
Optional callback function called when downloading file with ``bytes_in_chunk`` as argument.
lazy : bool = False
Whether to load the actual data (``lazy=False``) or return a proxy that loads
the data when accessed (``lazy=True``).
Returns
-------
Union[:class:`.SimulationData`, :class:`.HeatSimulationData`, :class:`.EMESimulationData`]
Object containing simulation data.
"""
path = Path(path)
task = TaskFactory.get(task_id) if task_id else None
# For component modeler batches, default to a clearer filename if the default was used.
if (
task_id
and isinstance(task, BatchTask)
and path.name in {"simulation_data.hdf5", "simulation_data.hdf5.gz"}
):
path = path.with_name(path.name.replace("simulation", "cm"))
if task_id is None:
if not path.exists():
raise FileNotFoundError("Cached file not found.")
elif not path.exists() or replace_existing:
download(task_id=task_id, path=path, verbose=verbose, progress_callback=progress_callback)
if verbose and task_id is not None:
console = get_logging_console()
if isinstance(task, BatchTask):
console.log(f"Loading component modeler data from {path}")
else:
console.log(f"Loading simulation from {path}")
stub_data = Tidy3dStubData.postprocess(path, lazy=lazy)
simulation_cache = resolve_local_cache()
if simulation_cache is not None and task_id is not None:
info = get_info(task_id, verbose=False)
workflow_type = getattr(info, "taskType", None)
if (
workflow_type != TaskType.MODE_SOLVER.name
): # we cannot get the simulation from data or web for mode solver
simulation = None
if lazy: # get simulation via web to avoid unpacking of lazy object in store_result
try:
with tempfile.NamedTemporaryFile(suffix=".hdf5") as tmp_file:
simulation = load_simulation(task_id, path=tmp_file.name, verbose=False)
except Exception as e:
log.info(f"Failed to load simulation for storing results: {e}.")
return stub_data
simulation_cache.store_result(
stub_data=stub_data,
task_id=task_id,
path=path,
workflow_type=workflow_type,
simulation=simulation,
)
return stub_data
def _monitor_modeler_batch(
task_id: str,
verbose: bool = True,
max_detail_tasks: int = 20,
) -> None:
"""Monitor modeler batch progress with aggregate and per-task views."""
console = get_logging_console() if verbose else None
task = BatchTask.get(task_id=task_id)
detail = _get_batch_detail_handle_error_status(task)
name = detail.name or "modeler_batch"
group_id = detail.groupId
status = detail.status.lower()
# Non-verbose path: poll without progress bars then return
if not verbose:
# Run phase
while status_to_stage(status)[0] not in END_STATES:
time.sleep(REFRESH_TIME)
detail = _get_batch_detail_handle_error_status(task)
status = detail.status.lower()
return
progress_columns = (
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=25),
TaskProgressColumn(),
TimeElapsedColumn(),
)
# Make the header
header = f"Subtasks status - {name}"
if group_id:
header += f"\nGroup ID: '{group_id}'"
console.log(header)
with Progress(*progress_columns, console=console, transient=False) as progress:
# Phase: Run (aggregate + per-task)
p_run = progress.add_task("Run Total", total=1.0)
task_bars: dict[str, int] = {}
stage = status_to_stage(status)[0]
prev_stage = status_to_stage(status)[0]
console.log(f"Batch status = {status}")
# Note: get_status errors if an erroring status occurred
while stage not in END_STATES:
total = len(detail.tasks)
r = detail.runSuccess or 0
if stage != prev_stage:
prev_stage = stage
console.log(f"Batch status = {stage}")
# Create per-task bars as soon as tasks appear
if total and total <= max_detail_tasks and detail.tasks:
name_to_task = {(t.taskName or t.taskId): t for t in (detail.tasks or [])}
for name, t in name_to_task.items():
if name not in task_bars:
tstatus = (t.status or "draft").lower()
_, idx = status_to_stage(tstatus)
pbar = progress.add_task(
f" {name}",
total=1.0,
completed=STATE_PROGRESS_PERCENTAGE[tstatus] / 100,
)
task_bars[name] = pbar
# Aggregate run progress: average stage fraction across tasks
if detail.tasks:
acc = 0.0
n_members = 0
for t in detail.tasks or []:
n_members += 1
tstatus = (t.status or "draft").lower()
_, idx = status_to_stage(tstatus)
acc += max(0.0, min(1.0, idx / MAX_STEPS))
run_frac = (acc / float(n_members)) if n_members else 0.0
else:
run_frac = (r / total) if total else 0.0
progress.update(p_run, completed=run_frac)
# Update per-task bars
if task_bars and detail.tasks:
name_to_task = {(t.taskName or t.taskId): t for t in (detail.tasks or [])}
for tname, pbar in task_bars.items():
t = name_to_task.get(tname)
if not t:
continue
tstatus = (t.status or "draft").lower()
_, idx = status_to_stage(tstatus)
desc = f" {tname} [{tstatus or 'draft'}]"
progress.update(
pbar,
completed=STATE_PROGRESS_PERCENTAGE[tstatus] / 100,
description=desc,
refresh=False,
)
progress.refresh()
time.sleep(REFRESH_TIME)
detail = _get_batch_detail_handle_error_status(task)
status = detail.status.lower()
stage = status_to_stage(status)[0]
if console is not None:
console.log("Modeler has finished running successfully.")
real_cost(task.task_id, verbose=verbose)
[docs]
@wait_for_connection
def delete(task_id: TaskId, versions: bool = False) -> TaskInfo:
"""Delete server-side data associated with task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
versions : bool = False
If ``True``, delete all versions of the task in the task group. Otherwise, delete only the version associated with the current task ID.
Returns
-------
:class:`.TaskInfo`
Object containing information about status, size, credits of task.
"""
if not task_id:
raise ValueError("Task id not found.")
task = TaskFactory.get(task_id, verbose=False)
task.delete(versions)
return TaskInfo(**{"taskId": task.task_id, **task.dict()})
@wait_for_connection
def download_simulation(
task_id: TaskId,
path: PathLike = SIM_FILE_HDF5,
verbose: bool = True,
progress_callback: Optional[Callable[[float], None]] = None,
) -> None:
"""Download the ``.hdf5`` file associated with the :class:`.Simulation` of a given task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
path : PathLike = "simulation.hdf5"
Download path to .hdf5 file of simulation (including filename).
verbose : bool = True
If ``True``, will print progressbars and status, otherwise, will run silently.
progress_callback : Callable[[float], None] = None
Optional callback function called when downloading file with ``bytes_in_chunk`` as argument.
"""
task = TaskFactory.get(task_id, verbose=False)
if isinstance(task, BatchTask):
raise NotImplementedError("Operation not implemented for modeler batches.")
info = get_info(task_id, verbose=False)
remote_sim_file = SIM_FILE_HDF5_GZ
if info.taskType == "MODE_SOLVER":
remote_sim_file = MODE_FILE_HDF5_GZ
task.get_simulation_hdf5(
path,
verbose=verbose,
progress_callback=progress_callback,
remote_sim_file=remote_sim_file,
)
[docs]
@wait_for_connection
def get_tasks(
num_tasks: Optional[int] = None, order: Literal["new", "old"] = "new", folder: str = "default"
) -> list[dict]:
"""Get a list with the metadata of the last ``num_tasks`` tasks.
Parameters
----------
num_tasks : int = None
The number of tasks to return, or, if ``None``, return all.
order : Literal["new", "old"] = "new"
Return the tasks in order of newest-first or oldest-first.
folder: str = "default"
Folder from which to get the tasks.
Returns
-------
List[Dict]
List of dictionaries storing the information for each of the tasks last ``num_tasks`` tasks.
"""
folder = Folder.get(folder, create=True)
tasks = folder.list_tasks()
if not tasks:
return []
if order == "new":
tasks = sorted(tasks, key=lambda t: t.created_at, reverse=True)
elif order == "old":
tasks = sorted(tasks, key=lambda t: t.created_at)
if num_tasks is not None:
tasks = tasks[:num_tasks]
return [task.dict() for task in tasks]
[docs]
@wait_for_connection
def estimate_cost(
task_id: str, verbose: bool = True, solver_version: Optional[str] = None
) -> float:
"""Compute the maximum FlexCredit charge for a given task.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
verbose : bool = True
Whether to log the cost and helpful messages.
solver_version : str = None
Target solver version.
Returns
-------
float
Estimated maximum cost for :class:`.Simulation` associated with given ``task_id``.
Note
----
Cost is calculated assuming the simulation runs for
the full ``run_time``. If early shut-off is triggered, the cost is adjusted proportionately.
A minimum simulation cost may also apply, which depends on the task details.
Notes
-----
We can get the cost estimate of running the task before actually running it. This prevents us from
accidentally running large jobs that we set up by mistake. The estimated cost is the maximum cost
corresponding to running all the time steps.
Examples
--------
Basic example:
.. code-block:: python
# initializes job, puts task on server (but doesn't run it)
job = web.Job(simulation=sim, task_name="job", verbose=verbose)
# estimate the maximum cost
estimated_cost = web.estimate_cost(job.task_id)
print(f'The estimated maximum cost is {estimated_cost:.3f} Flex Credits.')
"""
if not isinstance(task_id, str):
raise ValueError(
f"Task ID: {task_id} is not a string. You can get it using 'web.upload(<WorkflowType>)'."
)
console = get_logging_console() if verbose else None
task = TaskFactory.get(task_id, verbose=False)
detail = task.detail()
if isinstance(task, BatchTask):
check_task_type = "FDTD" if detail.taskType == "MODAL_CM" else "RF_FDTD"
task.check(solver_version=solver_version, check_task_type=check_task_type)
detail = task.detail()
status = detail.status.lower()
while status not in ALL_POST_VALIDATE_STATES:
time.sleep(REFRESH_TIME)
detail = task.detail()
status = detail.status.lower()
if status in ERROR_STATES:
_batch_detail_error(resource_id=task_id)
est_flex_unit = detail.estFlexUnit
if verbose:
console.log(
f"Maximum FlexCredit cost: {est_flex_unit:1.3f}. Minimum cost depends on "
"task execution details. Use 'web.real_cost(task_id)' after run."
)
return est_flex_unit
# simulation path
task.estimate_cost(solver_version=solver_version)
task_info = get_info(task_id)
status = task_info.metadataStatus
# Wait for a termination status
while status not in ALL_POST_VALIDATE_STATES:
time.sleep(REFRESH_TIME)
task_info = get_info(task_id)
status = task_info.metadataStatus
if status in ERROR_STATES:
try:
# Try to obtain the error message
task = SimulationTask(taskId=task_id)
with tempfile.NamedTemporaryFile(suffix=".json") as tmp_file:
task.get_error_json(to_file=tmp_file.name, validation=True)
with open(tmp_file.name) as f:
error_content = json.load(f)
error_msg = error_content["validation_error"]
except Exception:
# If the error message could not be obtained, raise a generic error message
error_msg = "Error message could not be obtained, please contact customer support."
raise WebError(f"Error estimating cost for task {task_id}! {error_msg}")
if verbose:
console.log(
f"Estimated FlexCredit cost: {task_info.estFlexUnit:1.3f}. Minimum cost depends on "
"task execution details. Use 'web.real_cost(task_id)' to get the billed FlexCredit "
"cost after a simulation run."
)
fc_mode = task_info.estFlexCreditMode
fc_post = task_info.estFlexCreditPostProcess
if fc_mode:
console.log(f" {fc_mode:1.3f} FlexCredit of the total cost from mode solves.")
if fc_post:
console.log(f" {fc_post:1.3f} FlexCredit of the total cost from post-processing.")
return task_info.estFlexUnit
[docs]
@wait_for_connection
def real_cost(task_id: str, verbose: bool = True) -> float | None:
"""Get the billed cost for given task after it has been run.
Parameters
----------
task_id : str
Unique identifier of task on server. Returned by :meth:`upload`.
verbose : bool = True
Whether to log the cost and helpful messages.
Returns
-------
float
The flex credit cost that was billed for the given ``task_id``.
Note
----
The billed cost may not be immediately available when the task status is set to ``success``,
but should be available shortly after.
Examples
--------
To obtain the cost of a simulation, you can use the function ``tidy3d.web.real_cost(task_id)``. In the example
below, a job is created, and its cost is estimated. After running the simulation, the real cost can be obtained.
.. code-block:: python
import time
# initializes job, puts task on server (but doesn't run it)
job = web.Job(simulation=sim, task_name="job", verbose=verbose)
# estimate the maximum cost
estimated_cost = web.estimate_cost(job.task_id)
print(f'The estimated maximum cost is {estimated_cost:.3f} Flex Credits.')
# Runs the simulation.
sim_data = job.run(path="data/sim_data.hdf5")
time.sleep(5)
# Get the billed FlexCredit cost after a simulation run.
cost = web.real_cost(job.task_id)
"""
if not isinstance(task_id, str):
raise ValueError(
f"Task ID: {task_id} is not a string. You can get it using 'web.upload(<WorkflowType>)'."
)
console = get_logging_console() if verbose else None
task_info = get_info(task_id)
flex_unit = task_info.realFlexUnit
ori_flex_unit = getattr(task_info, "oriRealFlexUnit", flex_unit)
if not flex_unit:
log.warning(
f"Billed FlexCredit for task '{task_id}' is not available. If the task has been "
"successfully run, it should be available shortly."
)
else:
if verbose:
console.log(f"Billed flex credit cost: {flex_unit:1.3f}.")
if flex_unit != ori_flex_unit and "FDTD" in task_info.taskType:
console.log(
"Note: the task cost pro-rated due to early shutoff was below the minimum "
"threshold, due to fast shutoff. Decreasing the simulation 'run_time' should "
"decrease the estimated, and correspondingly the billed cost of such tasks."
)
return flex_unit
[docs]
@wait_for_connection
def account(verbose: bool = True) -> Account:
"""Get account information including FlexCredit balance and usage limits.
Parameters
----------
verbose : bool = True
If ``True``, prints account information including credit balance, expiration,
and free simulation counts.
Returns
-------
Account
Object containing account information such as credit balance, expiration dates,
and daily free simulation counts.
Examples
--------
Get account information:
.. code-block:: python
account_info = web.account()
# Displays:
# Current FlexCredit balance: 10.00 and expiration date: 2024-12-31 23:59:59.
# Remaining daily free simulations: 3.
"""
account_info = Account.get()
if verbose and account_info:
console = get_logging_console()
credit = account_info.credit
credit_expiration = account_info.credit_expiration
cycle_type = account_info.allowance_cycle_type
cycle_amount = account_info.allowance_current_cycle_amount
cycle_end_date = account_info.allowance_current_cycle_end_date
free_simulation_counts = account_info.daily_free_simulation_counts
message = ""
if credit is not None:
message += f"Current FlexCredit balance: {credit:.2f}"
if credit_expiration is not None:
message += (
f" and expiration date: {credit_expiration.strftime('%Y-%m-%d %H:%M:%S')}. "
)
else:
message += ". "
if cycle_type is not None and cycle_amount is not None and cycle_end_date is not None:
cycle_end = cycle_end_date.strftime("%Y-%m-%d %H:%M:%S")
message += f"{cycle_type} FlexCredit balance: {cycle_amount:.2f} and expiration date: {cycle_end}. "
if free_simulation_counts is not None:
message += f"Remaining daily free simulations: {free_simulation_counts}."
console.log(message)
return account_info
[docs]
@wait_for_connection
def test() -> None:
"""Confirm whether Tidy3D authentication is configured.
Raises
------
WebError
If Tidy3D authentication is not configured correctly.
Notes
-----
This method tests the authentication configuration by attempting to retrieve
the task list. If authentication is not properly set up, it will raise an
exception with instructions on how to configure authentication.
Examples
--------
Test authentication:
.. code-block:: python
web.test()
# If successful, displays:
# Authentication configured successfully!
"""
try:
# note, this is a little slow, but the only call that doesn't require providing a task id.
get_tasks(num_tasks=0)
console = get_logging_console()
console.log("Authentication configured successfully!")
except (WebError, HTTPError) as e:
url = "https://docs.flexcompute.com/projects/tidy3d/en/latest/index.html"
msg = (
str(e)
+ "\n\n"
+ "It looks like the Tidy3D Python interface is not configured with your "
"unique API key. "
"To get your API key, sign into 'https://tidy3d.simulation.cloud' and copy it "
"from your 'Account' page. Then you can configure tidy3d through command line "
"'tidy3d configure' (recommended). Alternatively, one can manually create the configuration "
"file by creating a file at your home directory '~/.tidy3d/config' (unix) or "
"'.tidy3d/config' (windows) with content like: \n\n"
"apikey = 'XXX' \n\nHere XXX is your API key copied from your account page within quotes.\n\n"
f"For details, check the instructions at {url}."
)
raise WebError(msg) from e