"""Output for simulation."""
from typing import Literal, Optional, Union
import numpy as np
import pydantic as pd
from flow360.component.simulation.entity_operation import (
_transform_direction,
_transform_point,
)
from flow360.component.simulation.framework.base_model import Flow360BaseModel
from flow360.component.simulation.framework.entity_base import EntityBase
from flow360.component.simulation.framework.entity_utils import generate_uuid
from flow360.component.simulation.outputs.output_fields import IsoSurfaceFieldNames
from flow360.component.simulation.unit_system import LengthType
from flow360.component.simulation.user_code.core.types import (
Expression,
UnytQuantity,
UserVariable,
ValueOrExpression,
get_input_value_dimensions,
get_input_value_length,
infer_units_by_unit_system,
is_variable_with_unit_system_as_units,
solver_variable_to_user_variable,
)
from flow360.component.simulation.user_code.core.utils import is_runtime_expression
from flow360.component.types import Axis
class _OutputItemBase(Flow360BaseModel):
name: str = pd.Field()
def __hash__(self):
return hash(self.name + "-" + self.__class__.__name__)
def __eq__(self, other):
if isinstance(other, _OutputItemBase):
return (self.name + "-" + self.__class__.__name__) == (
other.name + "-" + other.__class__.__name__
)
return False
def __str__(self):
return f"{self.__class__.__name__} with name: {self.name}"
[docs]
class Slice(EntityBase):
"""
:class:`Slice` class for defining a slice for :class:`~flow360.SliceOutput`.
Example
-------
Define a :class:`Slice` along (0,1,0) direction with the origin of (0,2,0) fl.u.m.
>>> fl.Slice(
... name="Slice",
... normal=(0, 1, 0),
... origin=(0, 2, 0)*fl.u.m
... )
====
"""
private_attribute_entity_type_name: Literal["Slice"] = pd.Field("Slice", frozen=True)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
normal: Axis = pd.Field(description="Normal direction of the slice.")
# pylint: disable=no-member
origin: LengthType.Point = pd.Field(description="A single point on the slice.")
def _apply_transformation(self, matrix: np.ndarray) -> "Slice":
"""Apply 3x4 transformation matrix, returning new transformed instance."""
# Transform the origin point
origin_array = np.asarray(self.origin.value)
new_origin_array = _transform_point(origin_array, matrix)
new_origin = type(self.origin)(new_origin_array, self.origin.units)
# Transform and normalize the normal direction
normal_array = np.asarray(self.normal)
transformed_normal = _transform_direction(normal_array, matrix)
new_normal = tuple(transformed_normal / np.linalg.norm(transformed_normal))
return self.model_copy(update={"origin": new_origin, "normal": new_normal})
[docs]
class Isosurface(_OutputItemBase):
"""
:class:`Isosurface` class for defining an isosurface for :class:`~flow360.IsosurfaceOutput`.
Example
-------
Define a :class:`Isosurface` of temperature equal to 1.5 non-dimensional temperature.
>>> fl.Isosurface(
... name="Isosurface_T_1.5",
... iso_value=1.5,
... field="T",
... wallDistanceClipThreshold=0.005 * fl.u.m, (optional)
... )
====
"""
field: Union[IsoSurfaceFieldNames, str, UserVariable] = pd.Field(
description="Isosurface field variable. One of :code:`p`, :code:`rho`, "
":code:`Mach`, :code:`qcriterion`, :code:`s`, :code:`T`, :code:`Cp`, :code:`mut`,"
" :code:`nuHat` or one of scalar field defined in :class:`UserDefinedField`."
)
# pylint: disable=fixme
iso_value: ValueOrExpression[Union[UnytQuantity, float]] = pd.Field(
description="Expect non-dimensional value.",
)
# pylint: disable=no-member
wall_distance_clip_threshold: Optional[LengthType.Positive] = pd.Field(
default=None,
description="Optional parameter to remove the isosurface within a specified distance from walls.",
)
@pd.field_validator("field", mode="before")
@classmethod
def _preprocess_expression_and_solver_variable(cls, value):
if isinstance(value, Expression):
raise ValueError(
f"Expression ({value}) cannot be directly used as isosurface field, "
"please define a UserVariable first."
)
return solver_variable_to_user_variable(value)
@pd.field_validator("iso_value", mode="before")
@classmethod
def _preprocess_field_with_unit_system(cls, value, info: pd.ValidationInfo):
if is_variable_with_unit_system_as_units(value):
return value
if info.data.get("field") is None:
# `field` validation failed.
raise ValueError(
"The isosurface field is invalid and therefore unit inference is not possible."
)
units = value["units"]
field = info.data["field"]
field_dimensions = get_input_value_dimensions(value=field)
value = infer_units_by_unit_system(
value=value, value_dimensions=field_dimensions, unit_system=units
)
return value
@pd.field_validator("field", mode="after")
@classmethod
def check_expression_length(cls, v):
"""Ensure the isofield is a scalar."""
if isinstance(v, UserVariable) and len(v) != 0:
raise ValueError(f"The isosurface field ({v}) must be defined with a scalar variable.")
return v
@pd.field_validator("field", mode="after")
@classmethod
def check_runtime_expression(cls, v):
"""Ensure the isofield is a runtime expression but not a constant value."""
if isinstance(v, UserVariable):
if not isinstance(v.value, Expression):
raise ValueError(f"The isosurface field ({v}) cannot be a constant value.")
try:
result = v.value.evaluate(raise_on_non_evaluable=False, force_evaluate=True)
except Exception as err:
raise ValueError(f"expression evaluation failed for the isofield: {err}") from err
if not is_runtime_expression(result):
raise ValueError(f"The isosurface field ({v}) cannot be a constant value.")
return v
@pd.field_validator("iso_value", mode="after")
@classmethod
def check_single_iso_value(cls, v):
"""Ensure the iso_value is a single value."""
if get_input_value_length(v) == 0:
return v
raise ValueError(f"The iso_value ({v}) must be a scalar.")
@pd.field_validator("iso_value", mode="after")
@classmethod
def check_iso_value_dimensions(cls, v, info: pd.ValidationInfo):
"""Ensure the iso_value has the same dimensions as the field."""
field = info.data.get("field", None)
if not isinstance(field, UserVariable):
return v
value_dimensions = get_input_value_dimensions(value=v)
if value_dimensions is None:
return v
field_dimensions = get_input_value_dimensions(value=field)
if field_dimensions != value_dimensions:
raise ValueError(
f"The iso_value ({v}, dimensions:{value_dimensions}) should have the same dimensions as "
f"the isosurface field ({field}, dimensions: {field_dimensions})."
)
return v
@pd.field_validator("iso_value", mode="after")
@classmethod
def check_iso_value_for_string_field(cls, v, info: pd.ValidationInfo):
"""Ensure the iso_value is float when string field is used."""
field = info.data.get("field", None)
if isinstance(field, str) and not isinstance(v, float):
raise ValueError(
f"The isosurface field ({field}) specified by string "
"can only be used with a nondimensional iso_value."
)
return v
[docs]
class Point(EntityBase):
"""
:class:`Point` class for defining a single point used in various outputs.
Example
-------
>>> fl.Point(
... name="Point",
... location=(1.0, 2.0, 3.0) * fl.u.m,
... )
====
"""
private_attribute_entity_type_name: Literal["Point"] = pd.Field("Point", frozen=True)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
# pylint: disable=no-member
location: LengthType.Point = pd.Field(description="The coordinate of the point.")
def _apply_transformation(self, matrix: np.ndarray) -> "Point":
"""Apply 3x4 transformation matrix, returning new transformed instance."""
location_array = np.asarray(self.location.value)
new_location_array = _transform_point(location_array, matrix)
new_location = type(self.location)(new_location_array, self.location.units)
return self.model_copy(update={"location": new_location})
[docs]
class PointArray(EntityBase):
"""
:class:`PointArray` class for defining multiple equally spaced monitor points along a line used in various outputs.
Example
-------
Define :class:`PointArray` with 6 equally spaced points along a line starting from
(0,0,0) * fl.u.m to (1,2,3) * fl.u.m.
Both the starting and end points are included in the :class:`PointArray`.
>>> fl.PointArray(
... name="Line_1",
... start=(0.0, 0.0, 0.0) * fl.u.m,
... end=(1.0, 2.0, 3.0) * fl.u.m,
... number_of_points=6,
... )
====
"""
private_attribute_entity_type_name: Literal["PointArray"] = pd.Field("PointArray", frozen=True)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
# pylint: disable=no-member
start: LengthType.Point = pd.Field(description="The starting point of the line.")
end: LengthType.Point = pd.Field(description="The end point of the line.")
number_of_points: int = pd.Field(ge=2, description="Number of points along the line.")
def _apply_transformation(self, matrix: np.ndarray) -> "PointArray":
"""Apply 3x4 transformation matrix, returning new transformed instance."""
start_array = np.asarray(self.start.value)
end_array = np.asarray(self.end.value)
new_start_array = _transform_point(start_array, matrix)
new_end_array = _transform_point(end_array, matrix)
new_start = type(self.start)(new_start_array, self.start.units)
new_end = type(self.end)(new_end_array, self.end.units)
return self.model_copy(update={"start": new_start, "end": new_end})
[docs]
class PointArray2D(EntityBase):
"""
:class:`PointArray2D` class for defining multiple equally spaced points along the u and
v axes of a parallelogram.
Example
-------
Define :class:`PointArray2D` with points equally distributed on a parallelogram with
origin (1.0, 0.0, 0.0) * fl.u.m. There are 7 equally spaced points along the parallelogram's u-axis
of (0.5, 1.0, 0.2) * fl.u.m and 10 equally spaced points along the its v-axis of
(0.1, 0, 1) * fl.u.m.
Both the starting and end points are included in the :class:`PointArray`.
>>> fl.PointArray2D(
... name="Parallelogram_1",
... origin=(1.0, 0.0, 0.0) * fl.u.m,
... u_axis_vector=(0.5, 1.0, 0.2) * fl.u.m,
... v_axis_vector=(0.1, 0, 1) * fl.u.m,
... u_number_of_points=7,
... v_number_of_points=10
... )
====
"""
private_attribute_entity_type_name: Literal["PointArray2D"] = pd.Field(
"PointArray2D", frozen=True
)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
# pylint: disable=no-member
origin: LengthType.Point = pd.Field(description="The corner of the parallelogram.")
u_axis_vector: LengthType.Axis = pd.Field(description="The scaled u-axis of the parallelogram.")
v_axis_vector: LengthType.Axis = pd.Field(description="The scaled v-axis of the parallelogram.")
u_number_of_points: int = pd.Field(ge=2, description="The number of points along the u axis.")
v_number_of_points: int = pd.Field(ge=2, description="The number of points along the v axis.")
def _apply_transformation(self, matrix: np.ndarray) -> "PointArray2D":
"""Apply 3x4 transformation matrix, returning new transformed instance."""
# Transform the origin point
origin_array = np.asarray(self.origin.value)
new_origin_array = _transform_point(origin_array, matrix)
new_origin = type(self.origin)(new_origin_array, self.origin.units)
# Transform the u and v axis vectors (these are scaled directions, not unit vectors)
u_axis_array = np.asarray(self.u_axis_vector.value)
v_axis_array = np.asarray(self.v_axis_vector.value)
new_u_axis_array = _transform_direction(u_axis_array, matrix)
new_v_axis_array = _transform_direction(v_axis_array, matrix)
new_u_axis = type(self.u_axis_vector)(new_u_axis_array, self.u_axis_vector.units)
new_v_axis = type(self.v_axis_vector)(new_v_axis_array, self.v_axis_vector.units)
return self.model_copy(
update={"origin": new_origin, "u_axis_vector": new_u_axis, "v_axis_vector": new_v_axis}
)