Source code for flow360.component.results.case_results

"""Case results module"""

# pylint: disable=too-many-lines

from __future__ import annotations

import re
from enum import Enum
from pathlib import Path
from typing import Callable, Dict, List, Optional, get_args

import numpy as np
import pydantic as pd

from flow360.component.results.base_results import (
    _PHYSICAL_STEP,
    _PSEUDO_STEP,
    _TIME,
    LocalResultCSVModel,
    PerEntityResultCSVModel,
    ResultBaseModel,
    ResultCSVModel,
    ResultTarGZModel,
)
from flow360.component.results.results_utils import (
    BETDiskCSVHeaderOperation,
    DiskCoefficientsComputation,
    PorousMediumCoefficientsComputation,
)
from flow360.component.simulation.conversion import unit_converter as unit_converter_v2
from flow360.component.simulation.outputs.output_fields import (
    _CD_PER_STRIP,
    _CUMULATIVE_CD_CURVE,
    _HEAT_FLUX,
    _X,
    _Y,
    ForceOutputCoefficientNames,
    _CFx_PER_SPAN,
    _CFz_PER_SPAN,
    _CMy_PER_SPAN,
)
from flow360.component.simulation.simulation_params import SimulationParams
from flow360.component.simulation.unit_system import (
    Flow360UnitSystem,
    ForceType,
    MomentType,
    PowerType,
    is_flow360_unit,
)
from flow360.component.v1.conversions import unit_converter as unit_converter_v1
from flow360.component.v1.flow360_params import Flow360Params
from flow360.exceptions import Flow360ValueError
from flow360.log import log


class CaseDownloadable(Enum):
    """
    Case results filenames
    """

    # tar.gz
    SURFACES = "surfaces.tar.gz"
    VOLUMES = "volumes.tar.gz"
    SLICES = "slices.tar.gz"
    ISOSURFACES = "isosurfaces.tar.gz"
    MONITORS_ALL = "monitors.tar.gz"

    # convergence:
    NONLINEAR_RESIDUALS = "nonlinear_residual_v2.csv"
    LINEAR_RESIDUALS = "linear_residual_v2.csv"
    CFL = "cfl_v2.csv"
    MINMAX_STATE = "minmax_state_v2.csv"
    MAX_RESIDUAL_LOCATION = "max_residual_location_v2.csv"

    # forces:
    SURFACE_FORCES = "surface_forces_v2.csv"
    TOTAL_FORCES = "total_forces_v2.csv"
    BET_FORCES = "bet_forces_v2.csv"
    BET_FORCES_RADIAL_DISTRIBUTION = "bet_forces_radial_distribution_v2.csv"
    ACTUATOR_DISKS = "actuatorDisk_output_v2.csv"
    POROUS_MEDIA = "porous_media_output_v2.csv"
    LEGACY_FORCE_DISTRIBUTION = "postprocess/forceDistribution.csv"
    Y_SLICING_FORCE_DISTRIBUTION = "Y_slicing_forceDistribution.csv"
    X_SLICING_FORCE_DISTRIBUTION = "X_slicing_forceDistribution.csv"

    # user defined:
    MONITOR_PATTERN = r"monitor_(.+)_v2.csv"
    USER_DEFINED_DYNAMICS_PATTERN = r"udd_(.+)_v2.csv"

    # others:
    AEROACOUSTICS = "total_acoustics_v3.csv"
    SURFACE_HEAT_TRANSFER = "surface_heat_transfer_v2.csv"


class ResultsDownloaderSettings(pd.BaseModel):
    """
    Settings for the results downloader.

    Parameters
    ----------
    all : bool, optional (default False)
        Flag indicating whether to download all available results.
    overwrite : bool, optional (default False)
        Flag indicating whether to overwrite existing files during download.
    destination : str, optional (default ".")
        The destination directory where the results will be downloaded.
    """

    all: Optional[bool] = pd.Field(False)
    overwrite: Optional[bool] = pd.Field(False)
    destination: Optional[str] = pd.Field(".")


class TimeSeriesResultCSVModel(ResultCSVModel):
    """Base CSV model for time series results"""

    _x_columns: List[str] = [_PHYSICAL_STEP, _PSEUDO_STEP]

    @property
    def x_columns(self):
        """Get x column"""
        return self._x_columns


# separate classes used to further customise give resutls, for example nonlinear_residuals.plot()
[docs] class NonlinearResidualsResultCSVModel(TimeSeriesResultCSVModel): """NonlinearResidualsResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.NONLINEAR_RESIDUALS.value, frozen=True)
[docs] class LinearResidualsResultCSVModel(TimeSeriesResultCSVModel): """LinearResidualsResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.LINEAR_RESIDUALS.value, frozen=True)
[docs] class CFLResultCSVModel(TimeSeriesResultCSVModel): """CFLResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.CFL.value, frozen=True)
[docs] class MinMaxStateResultCSVModel(TimeSeriesResultCSVModel): """CFLResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.MINMAX_STATE.value, frozen=True)
[docs] class MaxResidualLocationResultCSVModel(TimeSeriesResultCSVModel): """MaxResidualLocationResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.MAX_RESIDUAL_LOCATION.value, frozen=True)
[docs] class TotalForcesResultCSVModel(TimeSeriesResultCSVModel): """TotalForcesResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.TOTAL_FORCES.value, frozen=True)
[docs] class SurfaceForcesResultCSVModel(PerEntityResultCSVModel, TimeSeriesResultCSVModel): """SurfaceForcesResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.SURFACE_FORCES.value, frozen=True) _variables: List[str] = list(get_args(ForceOutputCoefficientNames)) def _preprocess(self, filter_physical_steps_only: bool = True, include_time: bool = True): """ run some processing after data is loaded """ super()._preprocess( filter_physical_steps_only=filter_physical_steps_only, include_time=include_time )
[docs] def reload_data(self, filter_physical_steps_only: bool = True, include_time: bool = True): return super().reload_data(filter_physical_steps_only, include_time)
class SurfaceForcesGroupResultCSVModel(SurfaceForcesResultCSVModel): """SurfaceForcesGroupResultCSVModel""" remote_file_name: str = pd.Field(None, frozen=True) # Unused dummy field
[docs] class LegacyForceDistributionResultCSVModel(ResultCSVModel): """ForceDistributionResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.LEGACY_FORCE_DISTRIBUTION.value, frozen=True)
[docs] class XSlicingForceDistributionResultCSVModel(PerEntityResultCSVModel): """ForceDistributionResultCSVModel""" remote_file_name: str = pd.Field( CaseDownloadable.X_SLICING_FORCE_DISTRIBUTION.value, frozen=True ) _variables: List[str] = [_CUMULATIVE_CD_CURVE, _CD_PER_STRIP] _filter_when_zero = [_CD_PER_STRIP] _x_columns: List[str] = [_X] def _preprocess(self, filter_physical_steps_only: bool = False, include_time: bool = False): """ add _CD_PER_STRIP for filtering purpose and preprocess """ for entity in self.entities: header = f"{entity}_{_CUMULATIVE_CD_CURVE}" cumulative_cd = np.array(self._values[header]) cd_per_strip = np.insert(np.diff(cumulative_cd), 0, cumulative_cd[0]) header_to_add = f"{entity}_{_CD_PER_STRIP}" self._values[header_to_add] = cd_per_strip.tolist() super()._preprocess( filter_physical_steps_only=filter_physical_steps_only, include_time=include_time )
[docs] class YSlicingForceDistributionResultCSVModel(PerEntityResultCSVModel): """ForceDistributionResultCSVModel""" remote_file_name: str = pd.Field( CaseDownloadable.Y_SLICING_FORCE_DISTRIBUTION.value, frozen=True ) _variables: List[str] = [_CFx_PER_SPAN, _CFz_PER_SPAN, _CMy_PER_SPAN] _filter_when_zero = [_CFx_PER_SPAN, _CFz_PER_SPAN, _CMy_PER_SPAN] _x_columns: List[str] = [_Y]
[docs] class SurfaceHeatTransferResultCSVModel(PerEntityResultCSVModel, TimeSeriesResultCSVModel): """SurfaceHeatTransferResultCSVModel""" remote_file_name: str = pd.Field(CaseDownloadable.SURFACE_HEAT_TRANSFER.value, frozen=True) _variables: List[str] = [_HEAT_FLUX] _filter_when_zero = []
[docs] class AeroacousticsResultCSVModel(TimeSeriesResultCSVModel): """AeroacousticsResultCSVModel""" _x_columns: List[str] = [_PHYSICAL_STEP, _TIME] remote_file_name: str = pd.Field(CaseDownloadable.AEROACOUSTICS.value, frozen=True)
MonitorCSVModel = ResultCSVModel
[docs] class MonitorsResultModel(ResultTarGZModel): """ Model for handling results of monitors in TAR GZ and CSV formats. Inherits from ResultTarGZModel. """ remote_file_name: str = pd.Field(CaseDownloadable.MONITORS_ALL.value, frozen=True) get_download_file_list_method: Optional[Callable] = pd.Field(lambda: None) _monitor_names: List[str] = pd.PrivateAttr([]) _monitors: Dict[str, MonitorCSVModel] = pd.PrivateAttr({}) @property def monitor_names(self): """ Get the list of monitor names. Returns ------- list of str List of monitor names. """ if len(self._monitor_names) == 0: pattern = CaseDownloadable.MONITOR_PATTERN.value file_list = [ file["fileName"] for file in self.get_download_file_list_method() # pylint:disable=not-callable ] for filepath in file_list: if str(Path(filepath).parent) == "results": filename = Path(filepath).name match = re.match(pattern, filename) if match: name = match.group(1) self._monitor_names.append(name) self._monitors[name] = MonitorCSVModel(remote_file_name=filename) # pylint: disable=protected-access self._monitors[name]._download_method = ( self._download_method ) # pylint: disable=protected-access return self._monitor_names
[docs] def get_monitor_by_name(self, name: str) -> MonitorCSVModel: """ Get a monitor by name. Parameters ---------- name : str The name of the monitor. Returns ------- MonitorCSVModel The MonitorCSVModel corresponding to the given name. Raises ------ Flow360ValueError If the monitor with the provided name is not found. """ if name not in self.monitor_names: raise Flow360ValueError( f"Cannot find monitor with provided name={name}, available monitors: {self.monitor_names}" ) return self._monitors[name]
def __getitem__(self, name: str) -> MonitorCSVModel: """ Get a monitor by name (supporting [] access). """ return self.get_monitor_by_name(name)
UserDefinedDynamicsCSVModel = TimeSeriesResultCSVModel
[docs] class UserDefinedDynamicsResultModel(ResultBaseModel): """ Model for handling results of user-defined dynamics. Inherits from ResultBaseModel. """ remote_file_name: str = pd.Field(None, frozen=True) get_download_file_list_method: Optional[Callable] = pd.Field(lambda: None) _udd_names: List[str] = pd.PrivateAttr([]) _udds: Dict[str, UserDefinedDynamicsCSVModel] = pd.PrivateAttr({}) @property def udd_names(self): """ Get the list of user-defined dynamics names. Returns ------- list of str List of user-defined dynamics names. """ if len(self._udd_names) == 0: pattern = CaseDownloadable.USER_DEFINED_DYNAMICS_PATTERN.value file_list = [ file["fileName"] for file in self.get_download_file_list_method() # pylint:disable=not-callable ] for filepath in file_list: if str(Path(filepath).parent) == "results": filename = Path(filepath).name match = re.match(pattern, filename) if match: name = match.group(1) self._udd_names.append(name) self._udds[name] = UserDefinedDynamicsCSVModel(remote_file_name=filename) # pylint: disable=protected-access self._udds[name]._download_method = self._download_method return self._udd_names
[docs] def download( self, to_folder: str = ".", overwrite: bool = False ): # pylint:disable=arguments-differ """ Download all udd files to the specified location. Parameters ---------- to_folder : str, optional The folder where the file will be downloaded. overwrite : bool, optional Flag indicating whether to overwrite existing files. """ for udd in self._udds.values(): udd.download(to_folder=to_folder, overwrite=overwrite)
[docs] def get_udd_by_name(self, name: str) -> UserDefinedDynamicsCSVModel: """ Get user-defined dynamics by name. Parameters ---------- name : str The name of the user-defined dynamics. Returns ------- UserDefinedDynamicsCSVModel The UserDefinedDynamicsCSVModel corresponding to the given name. Raises ------ Flow360ValueError If the user-defined dynamics with the provided name is not found. """ if name not in self.udd_names: raise Flow360ValueError( f"Cannot find user defined dynamics with provided name={name}, " f"available user defined dynamics: {self.udd_names}" ) return self._udds[name]
def __getitem__(self, name: str) -> UserDefinedDynamicsCSVModel: """ Get a UUD by name (supporting [] access). """ return self.get_udd_by_name(name)
class _DimensionedCSVResultModel(pd.BaseModel): """ Base model for handling dimensioned CSV results. Attributes ---------- _name : str Name of the dimensioned CSV result. """ _name: str def _in_base_component(self, base, component, component_name, params): log.debug(f" -> need conversion for: {component_name} = {component}") if isinstance(params, SimulationParams): flow360_conv_system = unit_converter_v2( component.units.dimensions, params=params, required_by=[self._name, component_name], ) elif isinstance(params, Flow360Params): flow360_conv_system = unit_converter_v1( component.units.dimensions, params=params, required_by=[self._name, component_name], ) else: raise Flow360ValueError( f"Unknown type of params: {type(params)=}, expected one of (Flow360Params, SimulationParams)" ) if is_flow360_unit(component): converted = component.in_base(base, flow360_conv_system) else: component.units.registry = flow360_conv_system.registry # pylint:disable=no-member converted = component.in_base(unit_system=base) log.debug(f" converted to: {converted}") return converted class _ActuatorDiskResults(_DimensionedCSVResultModel): """ Model for handling results of actuator disks. Inherits from _DimensionedCSVResultModel. Attributes ---------- power : PowerType.Array Array of power values. force : ForceType.Array Array of force values. moment : MomentType.Array Array of moment values. Methods ------- to_base(base: Any, params: Any) Convert the results to the specified base system. """ power: PowerType.Array = pd.Field() force: ForceType.Array = pd.Field() moment: MomentType.Array = pd.Field() _name = "actuator_disks" def to_base(self, base: str, params: Flow360Params): """ Convert the results to the specified base system. Parameters ---------- base : str The base system to convert the results to, for example SI. params : Flow360Params Case parameters for the conversion. """ self.power = self._in_base_component(base, self.power, "power", params) self.force = self._in_base_component(base, self.force, "force", params) self.moment = self._in_base_component(base, self.moment, "moment", params) class OptionallyDownloadableResultCSVModel(ResultCSVModel): """ Model for handling optionally downloadable CSV results. Inherits from ResultCSVModel. """ _err_msg = "Case does not produced these results." def download( self, to_file: str = None, to_folder: str = ".", overwrite: bool = False, **kwargs ): """ Download the results to the specified file or folder. Parameters ---------- to_file : str, optional The file path where the results will be saved. to_folder : str, optional The folder path where the results will be saved. overwrite : bool, optional Whether to overwrite existing files with the same name. Raises ------ CloudFileNotFoundError If the cloud file for the results is not found. """ # pylint: disable=import-outside-toplevel from botocore.exceptions import ClientError as CloudFileNotFoundError try: super().download( to_file=to_file, to_folder=to_folder, overwrite=overwrite, log_error=False, **kwargs ) except CloudFileNotFoundError as err: if self._is_downloadable() is False: # pylint:disable=not-callable log.warning(self._err_msg) else: log.error( "A problem occurred when trying to download results:" f"{self.remote_file_name}" ) raise err
[docs] class ActuatorDiskResultCSVModel(OptionallyDownloadableResultCSVModel): """ Model for handling actuator disk CSV results. Inherits from OptionallyDownloadableResultCSVModel. Methods ------- to_base(base, params=None) Convert the results to the specified base system. Notes ----- This class provides methods to handle actuator disk CSV results and convert them to the specified base system. """ remote_file_name: str = pd.Field(CaseDownloadable.ACTUATOR_DISKS.value, frozen=True) _err_msg = "Case does not have any actuator disks."
[docs] def to_base(self, base: str, params: Flow360Params = None): """ Convert the results to the specified base system. Parameters ---------- base : str The base system to convert the results to. For example SI. params : Flow360Params, optional Case parameters for the conversion. """ if params is None: params = self._get_params_method() # pylint:disable=not-callable disk_names = np.unique( [v.split("_")[0] for v in self.values.keys() if v.startswith("Disk")] ) with Flow360UnitSystem(verbose=False): for disk_name in disk_names: disk = _ActuatorDiskResults( power=self.values[f"{disk_name}_Power"], force=self.values[f"{disk_name}_Force"], moment=self.values[f"{disk_name}_Moment"], ) disk.to_base(base, params) self.values[f"{disk_name}_Power"] = disk.power self.values[f"{disk_name}_Force"] = disk.force self.values[f"{disk_name}_Moment"] = disk.moment self.values["PowerUnits"] = disk.power.units self.values["ForceUnits"] = disk.force.units self.values["MomentUnits"] = disk.moment.units
[docs] def compute_coefficients(self, params: SimulationParams) -> ActuatorDiskCoefficientsCSVModel: """ Compute disk coefficients from actuator disk forces and moments. Parameters ---------- params : SimulationParams Simulation parameters Returns ------- ActuatorDiskCoefficientsCSVModel Model containing computed coefficients """ return DiskCoefficientsComputation.compute_coefficients_static( params=params, values=self.as_dict(), disk_model_type="ActuatorDisk", iterate_step_values_func=self._iterate_step_values, coefficients_model_class=ActuatorDiskCoefficientsCSVModel, )
@staticmethod def _iterate_step_values(disk_name, disk_ctx, env, values): # pylint:disable=too-many-locals, protected-access force_mag_series = values.get(f"{disk_name}_Force", []) moment_mag_series = values.get(f"{disk_name}_Moment", []) for force_mag, moment_mag in zip(force_mag_series, moment_mag_series): axis = disk_ctx["axis"] center = disk_ctx["center"] force_vec = force_mag * axis r_vec = center - env["moment_center_global"] moment_global = moment_mag * axis + np.cross(r_vec, force_vec) dp_area = env["dynamic_pressure"] * env["area"] denom_force = dp_area denom_moment = dp_area * env["moment_length_vec"] # pylint:disable=invalid-name CF_vec = force_vec / denom_force CM_vec = np.divide( moment_global, denom_moment, out=np.zeros(3), where=denom_moment != 0 ) CD_val = float(np.dot(force_vec, env["drag_dir"]) / denom_force) CL_val = float(np.dot(force_vec, env["lift_dir"]) / denom_force) yield CF_vec, CM_vec, CL_val, CD_val
[docs] class ActuatorDiskCoefficientsCSVModel(ResultCSVModel): """CSV model for actuator disk coefficients output.""" remote_file_name: str = pd.Field("actuatorDisk_force_coefficients_v2.csv", frozen=True)
class _BETDiskResults(_DimensionedCSVResultModel): """ Model for handling BET disk results. Inherits from _DimensionedCSVResultModel. Attributes ---------- force_x : ForceType.Array Array of force values along the x-axis. force_y : ForceType.Array Array of force values along the y-axis. force_z : ForceType.Array Array of force values along the z-axis. moment_x : MomentType.Array Array of moment values about the x-axis. moment_y : MomentType.Array Array of moment values about the y-axis. moment_z : MomentType.Array Array of moment values about the z-axis. _name : str Name of the BET forces result. Methods ------- to_base(base, params) Convert the results to the specified base system. """ force_x: ForceType.Array = pd.Field() force_y: ForceType.Array = pd.Field() force_z: ForceType.Array = pd.Field() moment_x: MomentType.Array = pd.Field() moment_y: MomentType.Array = pd.Field() moment_z: MomentType.Array = pd.Field() _name = "bet_forces" def to_base(self, base: str, params: Flow360Params): """ Convert the results to the specified base system. Parameters ---------- base : str The base system to convert the results to, for example SI. params : Flow360Params Case parameters for the conversion. """ self.force_x = self._in_base_component(base, self.force_x, "force_x", params) self.force_y = self._in_base_component(base, self.force_y, "force_y", params) self.force_z = self._in_base_component(base, self.force_z, "force_z", params) self.moment_x = self._in_base_component(base, self.moment_x, "moment_x", params) self.moment_y = self._in_base_component(base, self.moment_y, "moment_y", params) self.moment_z = self._in_base_component(base, self.moment_z, "moment_z", params)
[docs] class BETForcesResultCSVModel(OptionallyDownloadableResultCSVModel): """ Model for handling BET forces CSV results. Inherits from OptionallyDownloadableResultCSVModel. Methods ------- to_base(base, params=None) Convert the results to the specified base system. """ remote_file_name: str = pd.Field(CaseDownloadable.BET_FORCES.value, frozen=True) _err_msg = "Case does not have any BET disks."
[docs] def to_base(self, base: str, params: Flow360Params = None): """ Convert the results to the specified base system. Parameters ---------- base : str The base system to convert the results to. For example SI. params : Flow360Params, optional Case parameters for the conversion. """ if params is None: params = self._get_params_method() # pylint:disable=not-callable disk_names = np.unique( [v.split("_")[0] for v in self.values.keys() if v.startswith("Disk")] ) with Flow360UnitSystem(verbose=False): for disk_name in disk_names: bet = _BETDiskResults( force_x=self.values[f"{disk_name}_Force_x"], force_y=self.values[f"{disk_name}_Force_y"], force_z=self.values[f"{disk_name}_Force_z"], moment_x=self.values[f"{disk_name}_Moment_x"], moment_y=self.values[f"{disk_name}_Moment_y"], moment_z=self.values[f"{disk_name}_Moment_z"], ) bet.to_base(base, params) self.values[f"{disk_name}_Force_x"] = bet.force_x self.values[f"{disk_name}_Force_y"] = bet.force_y self.values[f"{disk_name}_Force_z"] = bet.force_z self.values[f"{disk_name}_Moment_x"] = bet.moment_x self.values[f"{disk_name}_Moment_y"] = bet.moment_y self.values[f"{disk_name}_Moment_z"] = bet.moment_z self.values["ForceUnits"] = bet.force_x.units self.values["MomentUnits"] = bet.moment_x.units
[docs] def format_headers( self, params: SimulationParams, pattern: str = "$BETName_$CylinderName" ) -> LocalResultCSVModel: """ Renames the header entries from Disk{i}_ to based on an input user pattern such as $BETName_$CylinderName Parameters ---------- params : SimulationParams Simulation parameters pattern : str Pattern string to rename header entries. Available patterns [$BETName, $CylinderName, $DiskLocalIndex, $DiskGlobalIndex] Returns ------- LocalResultCSVModel Model containing csv with updated header """ return BETDiskCSVHeaderOperation.format_headers(self, params, pattern)
[docs] def compute_coefficients(self, params: SimulationParams) -> BETDiskCoefficientsCSVModel: """ Compute disk coefficients from BET disk forces and moments. Parameters ---------- params : SimulationParams Simulation parameters Returns ------- BETDiskCoefficientsCSVModel Model containing computed coefficients """ return DiskCoefficientsComputation.compute_coefficients_static( params=params, values=self.as_dict(), disk_model_type="BETDisk", iterate_step_values_func=self._iterate_step_values, coefficients_model_class=BETDiskCoefficientsCSVModel, )
@staticmethod def _iterate_step_values(disk_name, disk_ctx, env, values): # pylint:disable=protected-access, too-many-locals fx_series = values.get(f"{disk_name}_Force_x", []) fy_series = values.get(f"{disk_name}_Force_y", []) fz_series = values.get(f"{disk_name}_Force_z", []) mx_series = values.get(f"{disk_name}_Moment_x", []) my_series = values.get(f"{disk_name}_Moment_y", []) mz_series = values.get(f"{disk_name}_Moment_z", []) for fx, fy, fz, mx, my, mz in zip( fx_series, fy_series, fz_series, mx_series, my_series, mz_series ): center = disk_ctx["center"] force_vec = np.array([fx, fy, fz], dtype=float) moment_vec = np.array([mx, my, mz], dtype=float) r_vec = center - env["moment_center_global"] moment_global = moment_vec + np.cross(r_vec, force_vec) dp_area = env["dynamic_pressure"] * env["area"] denom_force = dp_area denom_moment = dp_area * env["moment_length_vec"] # pylint:disable=invalid-name CF_vec = force_vec / denom_force CM_vec = np.divide( moment_global, denom_moment, out=np.zeros(3), where=denom_moment != 0 ) CD_val = float(np.dot(force_vec, env["drag_dir"]) / denom_force) CL_val = float(np.dot(force_vec, env["lift_dir"]) / denom_force) yield CF_vec, CM_vec, CL_val, CD_val
[docs] class BETDiskCoefficientsCSVModel(ResultCSVModel): """CSV model for BET disk coefficients output.""" remote_file_name: str = pd.Field("bet_force_coefficients_v2.csv", frozen=True)
[docs] def format_headers( self, params: SimulationParams, pattern: str = "$BETName_$CylinderName" ) -> LocalResultCSVModel: """ Renames the header entries from Disk{i}_ to based on an input user pattern such as $BETName_$CylinderName Parameters ---------- params : SimulationParams Simulation parameters pattern : str Pattern string to rename header entries. Available patterns [$BETName, $CylinderName, $DiskLocalIndex, $DiskGlobalIndex] Returns ------- LocalResultCSVModel Model containing csv with updated header """ return BETDiskCSVHeaderOperation.format_headers(self, params, pattern)
[docs] class PorousMediumResultCSVModel(OptionallyDownloadableResultCSVModel): """Model for handling porous medium CSV results.""" remote_file_name: str = pd.Field(CaseDownloadable.POROUS_MEDIA.value, frozen=True) _err_msg = "Case does not have any porous media zones."
[docs] def compute_coefficients(self, params: SimulationParams) -> PorousMediumCoefficientsCSVModel: """ Compute porous medium coefficients from forces and moments. Parameters ---------- params : SimulationParams Simulation parameters Returns ------- PorousMediumCoefficientsCSVModel Model containing computed coefficients """ return PorousMediumCoefficientsComputation.compute_coefficients_static( params=params, values=self.as_dict(), iterate_step_values_func=self._iterate_step_values, coefficients_model_class=PorousMediumCoefficientsCSVModel, )
@staticmethod def _iterate_step_values(zone_name, _, env, values): # pylint:disable=protected-access, too-many-locals fx_series = values.get(f"{zone_name}_Force_x", []) fy_series = values.get(f"{zone_name}_Force_y", []) fz_series = values.get(f"{zone_name}_Force_z", []) mx_series = values.get(f"{zone_name}_Moment_x", []) my_series = values.get(f"{zone_name}_Moment_y", []) mz_series = values.get(f"{zone_name}_Moment_z", []) for fx, fy, fz, mx, my, mz in zip( fx_series, fy_series, fz_series, mx_series, my_series, mz_series ): force_vec = np.array([fx, fy, fz], dtype=float) moment_vec = np.array([mx, my, mz], dtype=float) # Note: moment is already relative to global moment center from solver dp_area = env["dynamic_pressure"] * env["area"] denom_force = dp_area denom_moment = dp_area * env["moment_length_vec"] # pylint:disable=invalid-name CF_vec = force_vec / denom_force CM_vec = np.divide(moment_vec, denom_moment, out=np.zeros(3), where=denom_moment != 0) CD_val = float(np.dot(force_vec, env["drag_dir"]) / denom_force) CL_val = float(np.dot(force_vec, env["lift_dir"]) / denom_force) yield CF_vec, CM_vec, CL_val, CD_val
[docs] class PorousMediumCoefficientsCSVModel(ResultCSVModel): """CSV model for porous medium coefficients output.""" remote_file_name: str = pd.Field("porous_media_force_coefficients_v2.csv", frozen=True)
[docs] class BETForcesRadialDistributionResultCSVModel(OptionallyDownloadableResultCSVModel): """ Model for handling BET forces radial distribution CSV results. Inherits from OptionallyDownloadableResultCSVModel. """ remote_file_name: str = pd.Field( CaseDownloadable.BET_FORCES_RADIAL_DISTRIBUTION.value, frozen=True ) _err_msg = "Case does not have any BET disks."
[docs] def format_headers( self, params: SimulationParams, pattern: str = "$BETName_$CylinderName" ) -> LocalResultCSVModel: """ Renames the header entries from Disk{i}_ to based on an input user pattern such as $BETName_$CylinderName Parameters ---------- params : SimulationParams Simulation parameters pattern : str Pattern string to rename header entries. Available patterns [$BETName, $CylinderName, $DiskLocalIndex, $DiskGlobalIndex] Returns ------- LocalResultCSVModel Model containing csv with updated header """ return BETDiskCSVHeaderOperation.format_headers(self, params, pattern)