Source code for flow360.component.case

"""
Case component
"""

from __future__ import annotations

import json
import tempfile
import time
from typing import Any, Iterator, List, Union

import pydantic as pd

from .. import error_messages
from ..cloud.requests import MoveCaseItem, MoveToFolderRequest
from ..cloud.rest_api import RestApi
from ..exceptions import Flow360RuntimeError, Flow360ValidationError, Flow360ValueError
from ..log import log
from .flow360_params.flow360_params import Flow360Params, UnvalidatedFlow360Params
from .folder import Folder
from .interfaces import CaseInterface, FolderInterface, VolumeMeshInterface
from .resource_base import (
    Flow360Resource,
    Flow360ResourceBaseModel,
    Flow360ResourceListBase,
    Flow360Status,
    ResourceDraft,
    before_submit_only,
    is_object_cloud_resource,
)
from .results.case_results import (
    ActuatorDiskResultCSVModel,
    AeroacousticsResultCSVModel,
    BETForcesResultCSVModel,
    CaseDownloadable,
    CFLResultCSVModel,
    ForceDistributionResultCSVModel,
    LinearResidualsResultCSVModel,
    MaxResidualLocationResultCSVModel,
    MinMaxStateResultCSVModel,
    MonitorsResultModel,
    NonlinearResidualsResultCSVModel,
    ResultBaseModel,
    ResultsDownloaderSettings,
    ResultTarGZModel,
    SurfaceForcesResultCSVModel,
    SurfaceHeatTrasferResultCSVModel,
    TotalForcesResultCSVModel,
    UserDefinedDynamicsResultModel,
)
from .utils import is_valid_uuid, shared_account_confirm_proceed, validate_type
from .validator import Validator


class CaseBase:
    """
    Case Base component
    """

    def copy(
        self,
        name: str = None,
        params: Flow360Params = None,
        solver_version: str = None,
        tags: List[str] = None,
    ) -> CaseDraft:
        """
        Alias for retry case
        :param name:
        :param params:
        :param tags:
        :return:
        """

        return self.retry(name, params, solver_version=solver_version, tags=tags)

    # pylint: disable=no-member
    def retry(
        self,
        name: str = None,
        params: Flow360Params = None,
        solver_version: str = None,
        tags: List[str] = None,
    ) -> CaseDraft:
        """
        Retry case
        :param name:
        :param params:
        :param tags:
        :return:
        """

        name = name or self.name or self.info.name
        params = params or self.params.copy(deep=True)
        new_case = Case.create(
            name, params, other_case=self, solver_version=solver_version, tags=tags
        )
        return new_case

    def continuation(
        self, name: str = None, params: Flow360Params = None, tags: List[str] = None
    ) -> CaseDraft:
        """
        Alias for fork a case to continue simulation
        :param name:
        :param params:
        :param tags:
        :return:
        """

        return self.fork(name, params, tags)

    # pylint: disable=no-member
    def fork(
        self, name: str = None, params: Flow360Params = None, tags: List[str] = None
    ) -> CaseDraft:
        """
        Fork a case to continue simulation
        :param name:
        :param params:
        :param tags:
        :return:
        """

        name = name or self.name or self.info.name
        params = params or self.params.copy(deep=True)
        return Case.create(name, params, parent_case=self, tags=tags)


class CaseMeta(Flow360ResourceBaseModel):
    """
    CaseMeta data component
    """

    id: str = pd.Field(alias="caseId")
    case_mesh_id: str = pd.Field(alias="caseMeshId")
    parent_id: Union[str, None] = pd.Field(alias="parentId")
    status: Flow360Status = pd.Field()

    # pylint: disable=no-self-argument
    @pd.validator("status")
    def set_status_type(cls, value: Flow360Status):
        """set_status_type when case uploaded"""
        if value is Flow360Status.UPLOADED:
            return Flow360Status.CASE_UPLOADED
        return value

    def to_case(self) -> Case:
        """
        returns Case object from case meta info
        """
        return Case(self.id)


# pylint: disable=too-many-instance-attributes
class CaseDraft(CaseBase, ResourceDraft):
    """
    Case Draft component (before submission)
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        name: str,
        params: Flow360Params,
        volume_mesh_id: str = None,
        tags: List[str] = None,
        parent_id: str = None,
        other_case: Case = None,
        parent_case: Case = None,
        solver_version: str = None,
    ):
        self.name = name
        self.params = params
        self.volume_mesh_id = volume_mesh_id
        self.parent_case = parent_case
        self.parent_id = parent_id
        self.other_case = other_case
        self.tags = tags
        self.solver_version = solver_version
        self._id = None
        self._submitted_case = None
        ResourceDraft.__init__(self)

        self.validate_case_inputs()

    def __str__(self):
        return self.params.__str__()

    @property
    def params(self) -> Flow360Params:
        """
        returns case params
        """
        return self._params

    @params.setter
    def params(self, value: Flow360Params):
        """
        sets case params (before submit only)
        """
        if not isinstance(value, Flow360Params) and not isinstance(value, UnvalidatedFlow360Params):
            raise Flow360ValueError("params are not of type Flow360Params.")
        self._params = value

    @property
    def name(self) -> str:
        """
        returns case name
        """
        return self._name

    @name.setter
    def name(self, value) -> str:
        """
        sets case name
        """
        self._name = value

    @property
    def volume_mesh_id(self):
        """
        returns volume mesh id
        """
        return self._volume_mesh_id

    @volume_mesh_id.setter
    def volume_mesh_id(self, value):
        """
        sets volume mesh id
        """
        self._volume_mesh_id = value

    def to_case(self) -> Case:
        """Return Case from CaseDraft (must be after .submit())

        Returns
        -------
        Case
            Case representation

        Raises
        ------
        RuntimeError
            Raises error when case is before submission, i.e., is in draft state
        """
        if not self.is_cloud_resource():
            raise Flow360RuntimeError(
                f"Case name={self.name} is in draft state. Run .submit() before calling this function."
            )
        return Case(self.id)

    @before_submit_only
    def submit(self, force_submit: bool = False) -> Case:
        """
        submits case to cloud for running
        """
        assert self.name
        assert self.volume_mesh_id or self.other_case or self.parent_id or self.parent_case
        assert self.params

        self.validate_case_inputs(pre_submit_checks=True)

        if not shared_account_confirm_proceed():
            raise Flow360ValueError("User aborted resource submit.")

        volume_mesh_id = self.volume_mesh_id
        parent_id = self.parent_id
        if parent_id is not None:
            self.parent_case = Case(self.parent_id)

        if isinstance(self.parent_case, CaseDraft):
            self.parent_case = self.parent_case.to_case()

        if isinstance(self.other_case, CaseDraft):
            self.other_case = self.other_case.to_case()

        if self.other_case is not None and self.other_case.has_parent():
            self.parent_case = self.other_case.parent

        if self.parent_case is not None:
            parent_id = self.parent_case.id
            volume_mesh_id = self.parent_case.volume_mesh_id

            if (
                self.solver_version is not None
                and self.parent_case.solver_version != self.solver_version
            ):
                raise Flow360RuntimeError(
                    error_messages.change_solver_version_error(
                        self.parent_case.solver_version, self.solver_version
                    )
                )
            self.solver_version = self.parent_case.solver_version

        volume_mesh_id = volume_mesh_id or self.other_case.volume_mesh_id

        if self.solver_version is None:
            volume_mesh_info = Flow360ResourceBaseModel(
                **RestApi(VolumeMeshInterface.endpoint, id=volume_mesh_id).get()
            )
            self.solver_version = volume_mesh_info.solver_version

        is_valid_uuid(volume_mesh_id)
        self.validator_api(
            self.params,
            volume_mesh_id=volume_mesh_id,
            solver_version=self.solver_version,
            raise_on_error=(not force_submit),
        )

        data = {
            "name": self.name,
            "meshId": volume_mesh_id,
            "runtimeParams": self.params.flow360_json(),
            "tags": self.tags,
            "parentId": parent_id,
        }

        if self.solver_version is not None:
            data["solverVersion"] = self.solver_version

        resp = RestApi(CaseInterface.endpoint).post(
            json=data,
            path=f"volumemeshes/{volume_mesh_id}/case",
        )
        info = CaseMeta(**resp)
        self._id = info.id

        self._submitted_case = Case(self.id)
        log.info(f"Case successfully submitted: {self._submitted_case.short_description()}")
        return self._submitted_case

    def validate_case_inputs(self, pre_submit_checks=False):
        """
        validates case inputs (before submit only)
        """
        if self.volume_mesh_id is not None and self.other_case is not None:
            raise Flow360ValueError("You cannot specify both volume_mesh_id AND other_case.")

        if self.parent_id is not None and self.parent_case is not None:
            raise Flow360ValueError("You cannot specify both parent_id AND parent_case.")

        if self.parent_id is not None or self.parent_case is not None:
            if self.volume_mesh_id is not None or self.other_case is not None:
                raise Flow360ValueError(
                    "You cannot specify volume_mesh_id OR other_case when parent case provided."
                )

        is_valid_uuid(self.volume_mesh_id, allow_none=True)

        if pre_submit_checks:
            is_object_cloud_resource(self.other_case)
            is_object_cloud_resource(self.parent_case)

    @classmethod
    def validator_api(
        cls,
        params: Flow360Params,
        volume_mesh_id,
        solver_version: str = None,
        raise_on_error: bool = True,
    ):
        """
        validation api: validates case parameters before submitting
        """
        return Validator.CASE.validate(
            params,
            mesh_id=volume_mesh_id,
            solver_version=solver_version,
            raise_on_error=raise_on_error,
        )


# pylint: disable=too-many-instance-attributes
[docs] class Case(CaseBase, Flow360Resource): """ Case component """ # pylint: disable=redefined-builtin def __init__(self, id: str): super().__init__( interface=CaseInterface, info_type_class=CaseMeta, id=id, ) self._params = None self._raw_params = None self._results = CaseResultsModel(case=self) @classmethod def _from_meta(cls, meta: CaseMeta): validate_type(meta, "meta", CaseMeta) case = cls(id=meta.id) case._set_meta(meta) return case @property def params(self) -> Flow360Params: """ returns case params """ if self._params is None: self._raw_params = json.loads(self.get(method="runtimeParams")["content"]) try: with tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False ) as temp_file: json.dump(self._raw_params, temp_file) self._params = Flow360Params(temp_file.name) except pd.ValidationError as err: raise Flow360ValidationError(error_messages.params_fetching_error(err)) from err return self._params @property def params_as_dict(self) -> dict: """ returns case params as dictionary """ if self._raw_params is None: self._raw_params = json.loads(self.get(method="runtimeParams")["content"]) return self._raw_params
[docs] def has_parent(self) -> bool: """Check if case has parent case Returns ------- bool True when case has parent, False otherwise """ return self.info.parent_id is not None
@property def parent(self) -> Case: """parent case Returns ------- Case parent case object Raises ------ RuntimeError When case does not have parent """ if self.has_parent(): return Case(self.info.parent_id) raise Flow360RuntimeError("Case does not have parent case.") @property def info(self) -> CaseMeta: """ returns metadata info for case """ return super().info @property def volume_mesh_id(self): """ returns volume mesh id """ return self.info.case_mesh_id @property def results(self) -> CaseResultsModel: """ returns results object to managing case results """ return self._results
[docs] def is_steady(self): """ returns True when case is steady state """ return self.params.time_stepping.time_step_size == "inf"
[docs] def has_actuator_disks(self): """ returns True when case has actuator disk """ if self.params.actuator_disks is not None: if len(self.params.actuator_disks) > 0: return True return False
[docs] def has_bet_disks(self): """ returns True when case has BET disk """ if self.params.bet_disks is not None: if len(self.params.bet_disks) > 0: return True return False
[docs] def has_isosurfaces(self): """ returns True when case has isosurfaces """ return self.params.iso_surface_output is not None
[docs] def has_monitors(self): """ returns True when case has monitors """ return self.params.monitor_output is not None
[docs] def has_aeroacoustics(self): """ returns True when case has aeroacoustics """ return self.params.aeroacoustic_output is not None
[docs] def has_user_defined_dynamics(self): """ returns True when case has user defined dynamics """ return self.params.user_defined_dynamics is not None
[docs] def is_finished(self): """ returns False when case is in running or preprocessing state """ return self.status.is_final()
[docs] def move_to_folder(self, folder: Folder): """ Move the current case to the specified folder. Parameters ---------- folder : Folder The destination folder where the item will be moved. Returns ------- self Returns the modified item after it has been moved to the new folder. Notes ----- This method sends a REST API request to move the current item to the specified folder. The `folder` parameter should be an instance of the `Folder` class with a valid ID. """ RestApi(FolderInterface.endpoint).put( MoveToFolderRequest(dest_folder_id=folder.id, items=[MoveCaseItem(id=self.id)]).dict(), method="move", ) return self
@classmethod def _interface(cls): return CaseInterface @classmethod def _meta_class(cls): """ returns case meta info class: CaseMeta """ return CaseMeta @classmethod def _params_ancestor_id_name(cls): """ returns volumeMeshId name """ return "meshId"
[docs] @classmethod def from_cloud(cls, case_id: str): """ get case from cloud """ return cls(case_id)
# pylint: disable=too-many-arguments
[docs] @classmethod def create( cls, name: str, params: Flow360Params, volume_mesh_id: str = None, tags: List[str] = None, parent_id: str = None, other_case: Case = None, parent_case: Case = None, solver_version: str = None, ) -> CaseDraft: """ Create new case :param name: :param params: :param volume_mesh_id: :param other_case: :param tags: :param parent_id: :param parent_case: :return: """ assert name assert volume_mesh_id or other_case or parent_id or parent_case assert params if not isinstance(params, Flow360Params) and not isinstance( params, UnvalidatedFlow360Params ): raise Flow360ValueError("params are not of type Flow360Params.") new_case = CaseDraft( name=name, volume_mesh_id=volume_mesh_id, params=params.copy(), parent_id=parent_id, other_case=other_case, parent_case=parent_case, tags=tags, solver_version=solver_version, ) return new_case
[docs] def wait(self, timeout_minutes=60): """Wait until the Case finishes processing, refresh periodically""" start_time = time.time() while self.is_finished() is False: if time.time() - start_time > timeout_minutes * 60: raise TimeoutError( "Timeout: Process did not finish within the specified timeout period" ) time.sleep(2)
# pylint: disable=unnecessary-lambda class CaseResultsModel(pd.BaseModel): """ Pydantic models for case results """ case: Any = pd.Field() # tar.gz results: surfaces: ResultTarGZModel = pd.Field( default_factory=lambda: ResultTarGZModel(remote_file_name=CaseDownloadable.SURFACES.value) ) volumes: ResultTarGZModel = pd.Field( default_factory=lambda: ResultTarGZModel(remote_file_name=CaseDownloadable.VOLUMES.value) ) slices: ResultTarGZModel = pd.Field( default_factory=lambda: ResultTarGZModel(remote_file_name=CaseDownloadable.SLICES.value) ) isosurfaces: ResultTarGZModel = pd.Field( default_factory=lambda: ResultTarGZModel( remote_file_name=CaseDownloadable.ISOSURFACES.value ) ) monitors: MonitorsResultModel = pd.Field(MonitorsResultModel()) # convergence: nonlinear_residuals: NonlinearResidualsResultCSVModel = pd.Field( default_factory=lambda: NonlinearResidualsResultCSVModel() ) linear_residuals: LinearResidualsResultCSVModel = pd.Field( default_factory=lambda: LinearResidualsResultCSVModel() ) cfl: CFLResultCSVModel = pd.Field(default_factory=lambda: CFLResultCSVModel()) minmax_state: MinMaxStateResultCSVModel = pd.Field( default_factory=lambda: MinMaxStateResultCSVModel() ) max_residual_location: MaxResidualLocationResultCSVModel = pd.Field( default_factory=lambda: MaxResidualLocationResultCSVModel() ) # forces total_forces: TotalForcesResultCSVModel = pd.Field( default_factory=lambda: TotalForcesResultCSVModel() ) surface_forces: SurfaceForcesResultCSVModel = pd.Field( default_factory=lambda: SurfaceForcesResultCSVModel() ) actuator_disks: ActuatorDiskResultCSVModel = pd.Field( default_factory=lambda: ActuatorDiskResultCSVModel() ) bet_forces: BETForcesResultCSVModel = pd.Field( default_factory=lambda: BETForcesResultCSVModel() ) force_distribution: ForceDistributionResultCSVModel = pd.Field( default_factory=lambda: ForceDistributionResultCSVModel() ) # user defined: user_defined_dynamics: UserDefinedDynamicsResultModel = pd.Field( default_factory=lambda: UserDefinedDynamicsResultModel() ) # others surface_heat_transfer: SurfaceHeatTrasferResultCSVModel = pd.Field( default_factory=lambda: SurfaceHeatTrasferResultCSVModel() ) aeroacoustics: AeroacousticsResultCSVModel = pd.Field( default_factory=lambda: AeroacousticsResultCSVModel() ) _downloader_settings: ResultsDownloaderSettings = pd.PrivateAttr(ResultsDownloaderSettings()) # pylint: disable=no-self-argument, protected-access @pd.root_validator(pre=False) def pass_download_function(cls, values): """ Pass download methods into fields of the case results """ if "case" not in values: raise ValueError("case (type Case) is required") if not isinstance(values["case"], Case): raise TypeError("case must be of type Case") for field in cls.__fields__.values(): if field.name in values.keys(): value = values[field.name] if isinstance(value, ResultBaseModel): value._download_method = values["case"]._download_file value._get_params_method = lambda: values["case"].params values[field.name] = value return values # pylint: disable=no-self-argument, protected-access @pd.validator("monitors", "user_defined_dynamics", always=True) def pass_get_files_function(cls, value, values): """ Pass file getters into fields of the case results """ value.get_download_file_list_method = values["case"].get_download_file_list return value # pylint: disable=no-self-argument, protected-access @pd.validator("bet_forces", always=True) def pass_has_bet_forces_function(cls, value, values): """ Pass check to see if result is downloadable based on params """ value._is_downloadable = values["case"].has_bet_disks return value # pylint: disable=no-self-argument, protected-access @pd.validator("actuator_disks", always=True) def pass_has_actuator_disks_function(cls, value, values): """ Pass check to see if result is downloadable based on params """ value._is_downloadable = values["case"].has_actuator_disks return value # pylint: disable=no-self-argument, protected-access @pd.validator("isosurfaces", always=True) def pass_has_isosurfaces_function(cls, value, values): """ Pass check to see if result is downloadable based on params """ value._is_downloadable = values["case"].has_isosurfaces return value # pylint: disable=no-self-argument, protected-access @pd.validator("monitors", always=True) def pass_has_monitors_function(cls, value, values): """ Pass check to see if result is downloadable based on params """ value._is_downloadable = values["case"].has_monitors return value # pylint: disable=no-self-argument, protected-access @pd.validator("aeroacoustics", always=True) def pass_has_aeroacoustics_function(cls, value, values): """ Pass check to see if result is downloadable based on params """ value._is_downloadable = values["case"].has_aeroacoustics return value # pylint: disable=no-self-argument, protected-access @pd.validator("user_defined_dynamics", always=True) def pass_has_user_defined_dynamics_function(cls, value, values): """ Pass check to see if result is downloadable based on params """ value._is_downloadable = values["case"].has_user_defined_dynamics return value def _execute_downloading(self): """ Download all specified and available results for the case """ for _, value in self.__dict__.items(): if isinstance(value, ResultBaseModel): # we download if explicitly set set_downloader(<result_name>=True), # or all=True but only when is not result=False try_download = value.do_download is True if self._downloader_settings.all is True and value.do_download is not False: try_download = value._is_downloadable() is True if try_download is True: value.download( to_folder=self._downloader_settings.destination, overwrite=self._downloader_settings.overwrite, ) def set_destination( self, folder_name: str = None, use_case_name: bool = None, use_case_id: bool = None ): """ Set the destination for downloading files. Parameters ---------- folder_name : str, optional The name of the folder where files will be downloaded. use_case_name : bool, optional Whether to use the use case name for the destination. use_case_id : bool, optional Whether to use the use case ID for the destination. Raises ------ ValueError If more than one argument is provided or if no arguments are provided. """ # Check if only one argument is provided if sum(arg is not None for arg in [folder_name, use_case_name, use_case_id]) != 1: raise ValueError("Exactly one argument should be provided.") if folder_name is not None: self._downloader_settings.destination = folder_name if use_case_name is True: self._downloader_settings.destination = self.case.name if use_case_id is True: self._downloader_settings.destination = self.case.id # pylint: disable=too-many-arguments, too-many-locals, redefined-builtin def download( self, surface: bool = None, volume: bool = None, slices: bool = None, isosurfaces: bool = None, monitors: bool = None, nonlinear_residuals: bool = None, linear_residuals: bool = None, cfl: bool = None, minmax_state: bool = None, max_residual_location: bool = None, surface_forces: bool = None, total_forces: bool = None, bet_forces: bool = None, actuator_disks: bool = None, force_distribution: bool = None, user_defined_dynamics: bool = None, aeroacoustics: bool = None, surface_heat_transfer: bool = None, all: bool = None, overwrite: bool = False, destination: str = None, ): """ Download result files associated with the case. Parameters ---------- surface : bool, optional Download surface result file if True. volume : bool, optional Download volume result file if True. nonlinear_residuals : bool, optional Download nonlinear residuals file if True. linear_residuals : bool, optional Download linear residuals file if True. cfl : bool, optional Download CFL file if True. minmax_state : bool, optional Download minmax state file if True. surface_forces : bool, optional Download surface forces file if True. total_forces : bool, optional Download total forces file if True. bet_forces : bool, optional Download BET (Blade Element Theory) forces file if True. actuator_disk_output : bool, optional Download actuator disk output file if True. all : bool, optional Download all result files if True. Ignore file if explicitly set: <result_name>=False overwrite : bool, optional If True, overwrite existing files with the same name in the destination. destination : str, optional Location to save downloaded files. If None, files will be saved in the current directory under ID folder. """ self.surfaces.do_download = surface self.volumes.do_download = volume self.slices.do_download = slices self.isosurfaces.do_download = isosurfaces self.monitors.do_download = monitors self.nonlinear_residuals.do_download = nonlinear_residuals self.linear_residuals.do_download = linear_residuals self.cfl.do_download = cfl self.minmax_state.do_download = minmax_state self.max_residual_location.do_download = max_residual_location self.surface_forces.do_download = surface_forces self.total_forces.do_download = total_forces self.bet_forces.do_download = bet_forces self.actuator_disks.do_download = actuator_disks self.force_distribution.do_download = force_distribution self.user_defined_dynamics.do_download = user_defined_dynamics self.aeroacoustics.do_download = aeroacoustics self.surface_heat_transfer.do_download = surface_heat_transfer self._downloader_settings.all = all self._downloader_settings.overwrite = overwrite if destination is not None: self.set_destination(folder_name=destination) self._execute_downloading() def download_file_by_name(self, file_name, to_file=None, to_folder=".", overwrite: bool = True): """ Download file by name """ return self.case._download_file( file_name=file_name, to_file=to_file, to_folder=to_folder, overwrite=overwrite ) class CaseList(Flow360ResourceListBase): """ Case List component """ def __init__( self, mesh_id: str = None, from_cloud: bool = True, include_deleted: bool = False, limit=100 ): super().__init__( ancestor_id=mesh_id, from_cloud=from_cloud, include_deleted=include_deleted, limit=limit, resourceClass=Case, ) def filter(self): """ flitering list, not implemented yet """ raise NotImplementedError("Filters are not implemented yet") # resp = list(filter(lambda i: i['caseStatus'] != 'deleted', resp)) # pylint: disable=useless-parent-delegation def __getitem__(self, index) -> Case: """ returns CaseMeta info item of the list """ return super().__getitem__(index) def __iter__(self) -> Iterator[Case]: return super().__iter__()