# pylint: disable=too-many-lines
"""
Primitive type definitions for simulation entities.
"""
import re
from abc import ABCMeta, abstractmethod
from typing import Annotated, ClassVar, List, Literal, Optional, Tuple, Union, final
import numpy as np
import pydantic as pd
from pydantic import PositiveFloat
from typing_extensions import Self
import flow360.component.simulation.units as u
from flow360.component.simulation.entity_operation import (
_extract_rotation_matrix,
_extract_scale_from_matrix,
_is_uniform_scale,
_rotation_matrix_to_axis_angle,
_transform_direction,
_transform_point,
rotation_matrix_from_axis_and_angle,
)
from flow360.component.simulation.framework.base_model import Flow360BaseModel
from flow360.component.simulation.framework.entity_base import EntityBase, EntityList
from flow360.component.simulation.framework.entity_utils import generate_uuid
from flow360.component.simulation.framework.multi_constructor_model_base import (
MultiConstructorBaseModel,
)
from flow360.component.simulation.framework.unique_list import UniqueStringList
from flow360.component.simulation.unit_system import AngleType, AreaType, LengthType
from flow360.component.simulation.user_code.core.types import ValueOrExpression
from flow360.component.simulation.utils import BoundingBoxType, model_attribute_unlock
from flow360.component.simulation.validation.validation_context import (
ParamsValidationInfo,
contextual_field_validator,
contextual_model_validator,
)
from flow360.component.types import Axis
from flow360.component.utils import _naming_pattern_handler
from flow360.exceptions import Flow360DeprecationError, Flow360ValueError
BOUNDARY_FULL_NAME_WHEN_NOT_FOUND = "This boundary does not exist!!!"
def _get_generated_boundary_names(surface_name: str, volume_mesh_meta: dict[str, dict]) -> list:
"""
Returns all the boundaries that are eventually generated by name matching in the volume mesh metadata.
May return multiple boundaries when the original one is split into multiple boundaries.
"""
full_boundary_names = []
for zone_name, zone_meta in volume_mesh_meta["zones"].items():
for existing_boundary_name in zone_meta["boundaryNames"]:
pattern = re.escape(zone_name) + r"/(.*)"
match = re.search(pattern, existing_boundary_name)
if (
match is not None and match.group(1) == surface_name
) or existing_boundary_name == surface_name:
full_boundary_names.append(existing_boundary_name)
# Not found
if not full_boundary_names:
return [BOUNDARY_FULL_NAME_WHEN_NOT_FOUND]
return full_boundary_names
def _check_axis_is_orthogonal(axis_pair: Tuple[Axis, Axis]) -> Tuple[Axis, Axis]:
axis_1, axis_2 = np.array(axis_pair[0]), np.array(axis_pair[1])
dot_product = np.dot(axis_1, axis_2)
if not np.isclose(dot_product, 0, atol=1e-3):
raise ValueError(f"The two axes are not orthogonal, dot product is {dot_product}.")
axis_2 -= dot_product * axis_1
axis_2 /= np.linalg.norm(axis_2)
return (tuple(axis_1), tuple(axis_2))
def _auto_symmetric_plane_exists_from_bbox(
*,
global_bounding_box: BoundingBoxType,
planar_face_tolerance: float,
) -> bool:
"""
Determine whether automated farfield logic will generate a `symmetric` plane
from global bounding box extents and planar-face tolerance.
"""
y_min = global_bounding_box[0][1]
y_max = global_bounding_box[1][1]
tolerance = global_bounding_box.largest_dimension * planar_face_tolerance
positive_half = abs(y_min) < tolerance < y_max
negative_half = abs(y_max) < tolerance and y_min < -tolerance
return positive_half or negative_half
OrthogonalAxes = Annotated[Tuple[Axis, Axis], pd.AfterValidator(_check_axis_is_orthogonal)]
[docs]
class ReferenceGeometry(Flow360BaseModel):
"""
:class:`ReferenceGeometry` class contains all geometrical related reference values.
Example
-------
>>> ReferenceGeometry(
... moment_center=(1, 2, 1) * u.m,
... moment_length=(1, 1, 1) * u.m,
... area=1.5 * u.m**2
... )
>>> ReferenceGeometry(
... moment_center=(1, 2, 1) * u.m,
... moment_length=1 * u.m,
... area=1.5 * u.m**2
... ) # Equivalent to above
====
"""
# pylint: disable=no-member
moment_center: Optional[LengthType.Point] = pd.Field(
None, description="The x, y, z coordinate of moment center."
)
moment_length: Optional[Union[LengthType.Positive, LengthType.PositiveVector]] = pd.Field(
None, description="The x, y, z component-wise moment reference lengths."
)
area: Optional[ValueOrExpression[AreaType.Positive]] = pd.Field(
None, description="The reference area of the geometry."
)
private_attribute_area_settings: Optional[dict] = pd.Field(None)
[docs]
@classmethod
def fill_defaults(cls, ref, params): # type: ignore[override]
"""Return a new ReferenceGeometry with defaults filled using SimulationParams.
Defaults when missing or when ref is None:
- area: 1 * (base_length)**2
- moment_center: (0,0,0) * base_length
- moment_length: (1,1,1) * base_length
"""
# Note:
# This helper avoids scattering default logic; consumers can always call this
# to obtain a fully-specified reference geometry in solver units.
# `params.base_length` provides the length unit for the project.
# Determine base length unit from params
base_length_unit = params.base_length # LengthType quantity
# Start from provided or empty
if ref is None:
ref = cls()
# Compose output using provided values when available
area = ref.area
if area is None:
area = 1.0 * (base_length_unit**2)
moment_center = ref.moment_center
if moment_center is None:
moment_center = (0, 0, 0) * base_length_unit
moment_length = ref.moment_length
if moment_length is None:
moment_length = (1.0, 1.0, 1.0) * base_length_unit
return cls(area=area, moment_center=moment_center, moment_length=moment_length)
class GeometryBodyGroup(EntityBase):
"""
:class:`GeometryBodyGroup` represents a collection of bodies that are grouped for meshing and
coordinate-system-based transformation.
"""
private_attribute_tag_key: str = pd.Field(
description="The tag/attribute string used to group bodies.",
)
private_attribute_entity_type_name: Literal["GeometryBodyGroup"] = pd.Field(
"GeometryBodyGroup", frozen=True
)
private_attribute_sub_components: List[str] = pd.Field(
description="A list of body IDs which constitutes the current body group"
)
private_attribute_color: Optional[str] = pd.Field(
None, description="Color used for visualization"
)
mesh_exterior: bool = pd.Field(
True,
description="Option to define whether to mesh exterior or interior of body group in geometry AI."
"Note that this is a beta feature and the interface might change in future releases.",
)
@property
def transformation(self):
"""Deprecated property."""
raise Flow360DeprecationError(
"GeometryBodyGroup.transformation is deprecated and has been removed. "
"Please use CoordinateSystem for transformations instead."
)
@transformation.setter
def transformation(self, value):
"""Deprecated property setter."""
raise Flow360DeprecationError(
"GeometryBodyGroup.transformation is deprecated and has been removed. "
"Please use CoordinateSystem for transformations instead."
)
class _VolumeEntityBase(EntityBase, metaclass=ABCMeta):
"""All volumetric entities should inherit from this class."""
private_attribute_zone_boundary_names: UniqueStringList = pd.Field(
UniqueStringList(),
frozen=True,
description="Boundary names of the zone WITH the prepending zone name.",
)
private_attribute_full_name: Optional[str] = pd.Field(None, frozen=True)
def _is_volume_zone(self) -> bool:
"""This is not a zone if zone boundaries are not defined. For validation usage."""
return self.private_attribute_zone_boundary_names is not None
def _update_entity_info_with_metadata(self, volume_mesh_meta_data: dict[str, dict]) -> None:
"""
Update the full name of zones once the volume mesh is done.
e.g. rotating_cylinder --> rotatingBlock-rotating_cylinder
"""
entity_name = self.name
for zone_full_name, zone_meta in volume_mesh_meta_data["zones"].items():
pattern = r"rotatingBlock-" + re.escape(entity_name)
if entity_name == "__farfield_zone_name_not_properly_set_yet":
# We have hardcoded name for farfield zone.
pattern = r"stationaryBlock|fluid"
match = re.search(pattern, zone_full_name)
if match is not None or entity_name == zone_full_name:
with model_attribute_unlock(self, "private_attribute_full_name"):
self.private_attribute_full_name = zone_full_name
with model_attribute_unlock(self, "private_attribute_zone_boundary_names"):
self.private_attribute_zone_boundary_names = UniqueStringList(
items=zone_meta["boundaryNames"]
)
break
@property
def full_name(self):
"""Gets the full name which includes the zone name"""
if self.private_attribute_full_name is None:
return self.name
return self.private_attribute_full_name
class _SurfaceEntityBase(EntityBase, metaclass=ABCMeta):
private_attribute_full_name: Optional[str] = pd.Field(None, frozen=True)
def _update_entity_info_with_metadata(self, volume_mesh_meta_data: dict) -> None:
"""
Update parent zone name once the volume mesh is done.
"""
updated_boundary_names = _get_generated_boundary_names(self.name, volume_mesh_meta_data)
with model_attribute_unlock(self, "private_attribute_full_name"):
self.private_attribute_full_name = updated_boundary_names.pop(0)
multiplication_result = []
for new_boundary_name in updated_boundary_names:
multiplication_result.append(
self.copy(
update={
"name": new_boundary_name,
"private_attribute_full_name": new_boundary_name,
}
)
)
return multiplication_result if multiplication_result else None
@property
def full_name(self):
"""Gets the full name which includes the zone name"""
if self.private_attribute_full_name is None:
return self.name
return self.private_attribute_full_name
@final
class Edge(EntityBase):
"""
Edge which contains a set of grouped edges from geometry.
"""
private_attribute_entity_type_name: Literal["Edge"] = pd.Field("Edge", frozen=True)
private_attribute_tag_key: Optional[str] = pd.Field(
None,
description="The tag/attribute string used to group geometry edges to form this `Edge`.",
)
private_attribute_sub_components: Optional[List[str]] = pd.Field(
[], description="The edge ids in geometry that composed into this `Edge`."
)
@final
class GenericVolume(_VolumeEntityBase):
"""
Do not expose.
This type of entity will get auto-constructed by assets when loading metadata.
By design these GenericVolume entities should only contain basic connectivity/mesh information.
These can only come from uploaded volume mesh.
"""
private_attribute_entity_type_name: Literal["GenericVolume"] = pd.Field(
"GenericVolume", frozen=True
)
axes: Optional[OrthogonalAxes] = pd.Field(None, description="") # Porous media support
axis: Optional[Axis] = pd.Field(None) # Rotation support
# pylint: disable=no-member
center: Optional[LengthType.Point] = pd.Field(None, description="") # Rotation support
class BoxCache(Flow360BaseModel):
"""BoxCache"""
# `axes` will always exist as it needs to be used. So `axes` is more like a storage than input cache.
axes: Optional[OrthogonalAxes] = pd.Field(None)
# pylint: disable=no-member
center: Optional[LengthType.Point] = pd.Field(None)
size: Optional[LengthType.PositiveVector] = pd.Field(None)
name: Optional[str] = pd.Field(None)
[docs]
@final
class Box(MultiConstructorBaseModel, _VolumeEntityBase):
"""
:class:`Box` class represents a box in three-dimensional space.
Example
-------
>>> fl.Box(
... name="box",
... axis_of_rotation = (1, 0, 0),
... angle_of_rotation = 45 * fl.u.deg,
... center = (1, 1, 1) * fl.u.m,
... size=(0.2, 0.3, 2) * fl.u.m,
... )
Define a box using principal axes:
>>> fl.Box.from_principal_axes(
... name="box",
... axes=[(0, 1, 0), (0, 0, 1)],
... center=(0, 0, 0) * fl.u.m,
... size=(0.2, 0.3, 2) * fl.u.m,
... )
====
"""
type_name: Literal["Box"] = pd.Field("Box", frozen=True)
# pylint: disable=no-member
center: LengthType.Point = pd.Field(description="The coordinates of the center of the box.")
size: LengthType.PositiveVector = pd.Field(
description="The dimensions of the box (length, width, height)."
)
axis_of_rotation: Axis = pd.Field(
default=(0, 0, 1),
description="The rotation axis. Cannot change once specified.",
frozen=True,
)
angle_of_rotation: AngleType = pd.Field(
default=0 * u.degree,
description="The rotation angle. Cannot change once specified.",
frozen=True,
)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
private_attribute_input_cache: BoxCache = pd.Field(BoxCache(), frozen=True)
private_attribute_entity_type_name: Literal["Box"] = pd.Field("Box", frozen=True)
# pylint: disable=no-self-argument
[docs]
@MultiConstructorBaseModel.model_constructor
@pd.validate_call
def from_principal_axes(
cls,
name: str,
center: LengthType.Point,
size: LengthType.PositiveVector,
axes: OrthogonalAxes,
):
"""
Construct box from principal axes
"""
# validate
x_axis, y_axis = np.array(axes[0]), np.array(axes[1])
z_axis = np.cross(x_axis, y_axis)
rotation_matrix = np.transpose(np.asarray([x_axis, y_axis, z_axis], dtype=np.float64))
# Calculate the rotation axis n using numpy instead of scipy
eigvals, eigvecs = np.linalg.eig(rotation_matrix)
axis = np.real(eigvecs[:, np.where(np.isreal(eigvals))])
if axis.shape[2] > 1: # in case of 0 rotation angle
axis = axis[:, :, 0]
axis = np.ndarray.flatten(axis)
angle = np.sum(abs(np.angle(eigvals))) / 2
# Find correct angle
matrix_test = rotation_matrix_from_axis_and_angle(axis, angle)
angle *= -1 if np.isclose(rotation_matrix[0, :] @ matrix_test[:, 0], 1) else 1
# pylint: disable=not-callable
return cls(
name=name,
center=center,
size=size,
axis_of_rotation=tuple(axis),
angle_of_rotation=angle * u.rad,
)
@pd.model_validator(mode="after")
def _convert_axis_and_angle_to_coordinate_axes(self) -> Self:
"""
Converts the Box object's axis and angle orientation information to a
coordinate axes representation.
"""
# Ensure the axis is a numpy array
if not self.private_attribute_input_cache.axes:
axis = np.asarray(self.axis_of_rotation, dtype=np.float64)
angle = self.angle_of_rotation.to("rad").v.item()
# Normalize the axis vector
axis = axis / np.linalg.norm(axis)
rotation_matrix = rotation_matrix_from_axis_and_angle(axis, angle)
# pylint: disable=assigning-non-slot
self.private_attribute_input_cache.axes = np.transpose(rotation_matrix[:, :2]).tolist()
return self
@property
def axes(self):
"""Return the axes that the box is aligned with."""
return self.private_attribute_input_cache.axes
@pd.field_validator("center", "size", mode="after")
@classmethod
def _update_input_cache(cls, value, info: pd.ValidationInfo):
setattr(info.data["private_attribute_input_cache"], info.field_name, value)
return value
def _apply_transformation(self, matrix: np.ndarray) -> "Box":
"""Apply 3x4 transformation matrix with uniform scale validation and rotation composition."""
# Validate uniform scaling
if not _is_uniform_scale(matrix):
scale_factors = _extract_scale_from_matrix(matrix)
raise Flow360ValueError(
f"Box only supports uniform scaling. " f"Detected scale factors: {scale_factors}"
)
# Extract uniform scale factor
uniform_scale = _extract_scale_from_matrix(matrix)[0]
# Transform center
center_array = np.asarray(self.center.value)
new_center_array = _transform_point(center_array, matrix)
new_center = type(self.center)(new_center_array, self.center.units)
# Combine rotations: existing rotation + transformation rotation
# Step 1: Get existing rotation matrix from axis-angle
existing_axis = np.asarray(self.axis_of_rotation, dtype=np.float64)
existing_axis = existing_axis / np.linalg.norm(existing_axis)
existing_angle = self.angle_of_rotation.to("rad").v.item()
rot_existing = rotation_matrix_from_axis_and_angle(existing_axis, existing_angle)
# Step 2: Extract pure rotation from transformation matrix
rot_transform = _extract_rotation_matrix(matrix)
# Step 3: Combine rotations (apply transformation rotation to existing)
rot_combined = rot_transform @ rot_existing
# Step 4: Extract new axis-angle from combined rotation
new_axis, new_angle = _rotation_matrix_to_axis_angle(rot_combined)
# Scale size uniformly
new_size = self.size * uniform_scale
return self.model_copy(
update={
"center": new_center,
"axis_of_rotation": tuple(new_axis),
"angle_of_rotation": new_angle * u.rad,
"size": new_size,
}
)
[docs]
@final
class Cylinder(_VolumeEntityBase):
"""
:class:`Cylinder` class represents a cylinder in three-dimensional space.
Example
-------
>>> fl.Cylinder(
... name="bet_disk_volume",
... center=(0, 0, 0) * fl.u.inch,
... axis=(0, 0, 1),
... outer_radius=150 * fl.u.inch,
... height=15 * fl.u.inch,
... )
====
"""
private_attribute_entity_type_name: Literal["Cylinder"] = pd.Field("Cylinder", frozen=True)
axis: Axis = pd.Field(description="The axis of the cylinder.")
# pylint: disable=no-member
center: LengthType.Point = pd.Field(description="The center point of the cylinder.")
height: LengthType.Positive = pd.Field(description="The height of the cylinder.")
inner_radius: Optional[LengthType.NonNegative] = pd.Field(
0 * u.m, description="The inner radius of the cylinder."
)
outer_radius: LengthType.Positive = pd.Field(description="The outer radius of the cylinder.")
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
@pd.model_validator(mode="after")
def _check_inner_radius_is_less_than_outer_radius(self) -> Self:
if self.inner_radius is not None and self.inner_radius >= self.outer_radius:
raise ValueError(
f"Cylinder inner radius ({self.inner_radius}) must be less than outer radius ({self.outer_radius})."
)
return self
def _apply_transformation(self, matrix: np.ndarray) -> "Cylinder":
"""Apply 3x4 transformation matrix with uniform scale validation."""
# Validate uniform scaling
if not _is_uniform_scale(matrix):
scale_factors = _extract_scale_from_matrix(matrix)
raise Flow360ValueError(
f"Cylinder only supports uniform scaling. "
f"Detected scale factors: {scale_factors}"
)
# Extract uniform scale factor
uniform_scale = _extract_scale_from_matrix(matrix)[0]
# Transform center
center_array = np.asarray(self.center.value)
new_center_array = _transform_point(center_array, matrix)
new_center = type(self.center)(new_center_array, self.center.units)
# Rotate axis
axis_array = np.asarray(self.axis)
transformed_axis = _transform_direction(axis_array, matrix)
new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis))
# Scale dimensions uniformly
new_height = self.height * uniform_scale
new_outer_radius = self.outer_radius * uniform_scale
new_inner_radius = (
self.inner_radius * uniform_scale if self.inner_radius is not None else None
)
return self.model_copy(
update={
"center": new_center,
"axis": new_axis,
"height": new_height,
"outer_radius": new_outer_radius,
"inner_radius": new_inner_radius,
}
)
[docs]
@final
class AxisymmetricBody(_VolumeEntityBase):
"""
:class:`AxisymmetricBody` class represents a generic body of revolution in three-dimensional space,
represented as a list[(Axial Position, Radial Extent)] profile polyline with arbitrary center and axial direction.
Expect first and last profile samples to connect to axis, i.e., have radius = 0.
Example
-------
>>> fl.AxisymmetricBody(
... name="cone_frustum_body",
... center=(0, 0, 0) * fl.u.inch,
... axis=(0, 0, 1),
... profile_curve = [(-1, 0) * fl.u.inch, (-1, 1) * fl.u.inch, (1, 2) * fl.u.inch, (1, 0) * fl.u.inch]
... )
====
"""
private_attribute_entity_type_name: Literal["AxisymmetricBody"] = pd.Field(
"AxisymmetricBody", frozen=True
)
axis: Axis = pd.Field(description="The axis of the body of revolution.")
# pylint: disable=no-member
center: LengthType.Point = pd.Field(description="The center point of the body of revolution.")
profile_curve: List[LengthType.Pair] = pd.Field(
description="The (Axial, Radial) profile of the body of revolution."
)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
@pd.field_validator("profile_curve", mode="after")
@classmethod
def _check_radial_profile_is_positive(cls, curve):
first_point = curve[0]
if first_point[1] != 0:
raise ValueError(
f"Expect first profile sample to be (Axial, 0.0). Found invalid point: {str(first_point)}."
)
last_point = curve[-1]
if last_point[1] != 0:
raise ValueError(
f"Expect last profile sample to be (Axial, 0.0). Found invalid point: {str(last_point)}."
)
for profile_point in curve[1:-1]:
if profile_point[1] < 0:
raise ValueError(
f"Expect profile samples to be (Axial, Radial) samples with positive Radial."
f" Found invalid point: {str(profile_point)}."
)
return curve
def _apply_transformation(self, matrix: np.ndarray) -> "AxisymmetricBody":
"""Apply 3x4 transformation matrix with uniform scale validation."""
# Validate uniform scaling
if not _is_uniform_scale(matrix):
scale_factors = _extract_scale_from_matrix(matrix)
raise Flow360ValueError(
f"AxisymmetricBody only supports uniform scaling. "
f"Detected scale factors: {scale_factors}"
)
# Extract uniform scale factor
uniform_scale = _extract_scale_from_matrix(matrix)[0]
# Transform center
center_array = np.asarray(self.center.value)
new_center_array = _transform_point(center_array, matrix)
new_center = type(self.center)(new_center_array, self.center.units)
# Rotate axis
axis_array = np.asarray(self.axis)
transformed_axis = _transform_direction(axis_array, matrix)
new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis))
# Scale profile curve uniformly
new_profile_curve = []
for point in self.profile_curve:
point_array = np.asarray(point.value)
scaled_point_array = point_array * uniform_scale
new_profile_curve.append(type(point)(scaled_point_array, point.units))
return self.model_copy(
update={
"center": new_center,
"axis": new_axis,
"profile_curve": new_profile_curve,
}
)
class SurfacePrivateAttributes(Flow360BaseModel):
"""
Private attributes for Surface.
TODO: With the amount of private_attribute prefixes we have
TODO: Maybe it makes more sense to lump them together to save storage space?
"""
type_name: Literal["SurfacePrivateAttributes"] = pd.Field(
"SurfacePrivateAttributes", frozen=True
)
bounding_box: BoundingBoxType = pd.Field(description="Bounding box of the surface.")
@final
class Surface(_SurfaceEntityBase):
"""
:class:`Surface` represents a boundary surface in three-dimensional space.
"""
private_attribute_entity_type_name: Literal["Surface"] = pd.Field("Surface", frozen=True)
private_attribute_is_interface: Optional[bool] = pd.Field(
None,
frozen=True,
description="This is required when generated from volume mesh "
+ "but not required when from surface mesh meta.",
)
private_attribute_tag_key: Optional[str] = pd.Field(
None,
description="The tag/attribute string used to group geometry faces to form this `Surface`.",
)
private_attribute_sub_components: Optional[List[str]] = pd.Field(
[], description="The face ids in geometry that composed into this `Surface`."
)
private_attribute_color: Optional[str] = pd.Field(
None, description="Color used for visualization"
)
private_attributes: Optional[SurfacePrivateAttributes] = pd.Field(None)
# Note: private_attribute_id should not be `Optional` anymore.
# B.C. Updater and geometry pipeline will populate it.
def _overlaps(self, ghost_surface_center_y: Optional[float], length_tolerance: float) -> bool:
if self.private_attributes is None:
# Legacy cloud asset.
return False
# pylint: disable=no-member
my_bounding_box = self.private_attributes.bounding_box
if abs(my_bounding_box.ymax - ghost_surface_center_y) > length_tolerance:
return False
if abs(my_bounding_box.ymin - ghost_surface_center_y) > length_tolerance:
return False
return True
def _will_be_deleted_by_mesher(
# pylint: disable=too-many-arguments, too-many-return-statements, too-many-branches
self,
entity_transformation_detected: bool,
farfield_method: Optional[
Literal["auto", "quasi-3d", "quasi-3d-periodic", "user-defined", "wind-tunnel"]
],
global_bounding_box: Optional[BoundingBoxType],
planar_face_tolerance: Optional[float],
half_model_symmetry_plane_center_y: Optional[float],
quasi_3d_symmetry_planes_center_y: Optional[tuple[float]],
farfield_domain_type: Optional[str] = None,
) -> bool:
"""
Check against the automated farfield method and
determine if the current `Surface` will be deleted by the mesher.
"""
if entity_transformation_detected:
# If transformed then the following check will no longer be accurate
# since we do not know the final bounding box for each surface and global model.
return False
if global_bounding_box is None or planar_face_tolerance is None or farfield_method is None:
# VolumeMesh or Geometry/SurfaceMesh with legacy schema.
return False
length_tolerance = global_bounding_box.largest_dimension * planar_face_tolerance
if farfield_domain_type in ("half_body_positive_y", "half_body_negative_y"):
if self.private_attributes is not None:
# pylint: disable=no-member
y_min = self.private_attributes.bounding_box.ymin
y_max = self.private_attributes.bounding_box.ymax
if farfield_domain_type == "half_body_positive_y" and y_max < -length_tolerance:
return True
if farfield_domain_type == "half_body_negative_y" and y_min > length_tolerance:
return True
if farfield_method in ("user-defined", "wind-tunnel"):
# Not applicable to user defined or wind tunnel farfield
return False
if farfield_method == "auto":
if half_model_symmetry_plane_center_y is None:
# Legacy schema.
return False
if farfield_domain_type not in ("half_body_positive_y", "half_body_negative_y") and (
not _auto_symmetric_plane_exists_from_bbox(
global_bounding_box=global_bounding_box,
planar_face_tolerance=planar_face_tolerance,
)
):
return False
return self._overlaps(half_model_symmetry_plane_center_y, length_tolerance)
if farfield_method in ("quasi-3d", "quasi-3d-periodic"):
if quasi_3d_symmetry_planes_center_y is None:
# Legacy schema.
return False
for plane_center_y in quasi_3d_symmetry_planes_center_y:
if self._overlaps(plane_center_y, length_tolerance):
return True
return False
raise ValueError(f"Unknown auto farfield generation method: {farfield_method}.")
@final
class ImportedSurface(EntityBase):
"""ImportedSurface for post-processing"""
private_attribute_entity_type_name: Literal["ImportedSurface"] = pd.Field(
"ImportedSurface", frozen=True
)
private_attribute_sub_components: Optional[List[str]] = pd.Field(
None, description="A list of sub components"
)
file_name: Optional[str] = None
surface_mesh_id: Optional[str] = None
@pd.model_validator(mode="after")
def _populate_id_from_name(self) -> "ImportedSurface":
"""Ensure a deterministic private_attribute_id exists.
CoordinateSystemManager and MirrorManager use private_attribute_id as
a dict key for entity tracking. A deterministic id derived from name
guarantees the same ImportedSurface always resolves to the same id,
even when reconstructed from cloud metadata across sessions.
"""
if self.private_attribute_id is None:
object.__setattr__(self, "private_attribute_id", f"{self.name}_defaultBody")
return self
class GhostSurface(_SurfaceEntityBase):
"""
Represents a boundary surface that may or may not be generated therefore may or may not exist.
It depends on the submitted geometry/Surface mesh. E.g. the symmetry plane in `AutomatedFarfield`.
This is a token/place-holder used only on the Python API side.
All `GhostSurface` entities will be replaced with exact entity instances before simulation.json submission.
"""
name: str = pd.Field(frozen=True)
private_attribute_entity_type_name: Literal["GhostSurface"] = pd.Field(
"GhostSurface", frozen=True
)
class WindTunnelGhostSurface(GhostSurface):
"""Wind tunnel boundary patches."""
private_attribute_entity_type_name: Literal["WindTunnelGhostSurface"] = pd.Field(
"WindTunnelGhostSurface", frozen=True
)
# For frontend: list of floor types that use this boundary patch, or ["all"]
used_by: List[
Literal["StaticFloor", "FullyMovingFloor", "CentralBelt", "WheelBelts", "all"]
] = pd.Field(default_factory=lambda: ["all"], frozen=True)
def exists(self, _) -> bool:
"""Currently, .exists() is only called on automated farfield"""
raise ValueError(".exists should not be called on wind tunnel farfield")
# pylint: disable=missing-class-docstring
@final
class GhostSphere(_SurfaceEntityBase):
private_attribute_entity_type_name: Literal["GhostSphere"] = pd.Field(
"GhostSphere", frozen=True
)
# Note: Making following optional since front end will not carry these over to assigned entities.
center: Optional[List] = pd.Field(None, alias="center")
max_radius: Optional[PositiveFloat] = pd.Field(None, alias="maxRadius")
def exists(self, _) -> bool:
"""Ghost farfield always exists."""
return True
# pylint: disable=missing-class-docstring
@final
class GhostCircularPlane(_SurfaceEntityBase):
private_attribute_entity_type_name: Literal["GhostCircularPlane"] = pd.Field(
"GhostCircularPlane", frozen=True
)
# Note: Making following optional since front end will not carry these over to assigned entities.
center: Optional[List] = pd.Field(None, alias="center")
max_radius: Optional[PositiveFloat] = pd.Field(None, alias="maxRadius")
normal_axis: Optional[List] = pd.Field(None, alias="normalAxis")
def _get_existence_dependency(self, validation_info):
y_max = validation_info.global_bounding_box[1][1]
y_min = validation_info.global_bounding_box[0][1]
largest_dimension = -np.inf
for dim in range(3):
dimension = (
validation_info.global_bounding_box[1][dim]
- validation_info.global_bounding_box[0][dim]
)
largest_dimension = max(largest_dimension, dimension)
tolerance = largest_dimension * validation_info.planar_face_tolerance
return y_min, y_max, tolerance, largest_dimension
def exists(self, validation_info) -> bool:
"""For automated farfield, check mesher logic for symmetric plane existence."""
if self.name != "symmetric":
# Quasi-3D mode, no need to check existence.
return True
if validation_info is None:
raise ValueError("Validation info is required for GhostCircularPlane existence check.")
if validation_info.global_bounding_box is None:
# This likely means the user try to use mesher on old cloud resources.
# We cannot validate if symmetric exists so will let it pass. Pipeline will error out anyway.
return True
if validation_info.will_generate_forced_symmetry_plane():
return True
return _auto_symmetric_plane_exists_from_bbox(
global_bounding_box=validation_info.global_bounding_box,
planar_face_tolerance=validation_info.planar_face_tolerance,
)
def _per_entity_type_validation(self, param_info: ParamsValidationInfo):
"""Validate ghost surface existence and configuration."""
# pylint: disable=import-outside-toplevel
from flow360.component.simulation.validation.validation_utils import (
check_symmetric_boundary_existence,
check_user_defined_farfield_symmetry_existence,
)
# These functions expect a list, so wrap self
check_user_defined_farfield_symmetry_existence([self], param_info)
check_symmetric_boundary_existence([self], param_info)
return self
class SurfacePairBase(Flow360BaseModel):
"""
Base class for surface pair objects.
Subclasses must define a `pair` attribute with the appropriate surface type.
"""
pair: Tuple[_SurfaceEntityBase, _SurfaceEntityBase]
@pd.field_validator("pair", mode="after")
@classmethod
def check_unique(cls, v):
"""Check if pairing with self."""
if v[0].name == v[1].name:
raise ValueError("A surface cannot be paired with itself.")
return v
@pd.model_validator(mode="before")
@classmethod
def _format_input(cls, input_data: Union[dict, list, tuple]):
if isinstance(input_data, (list, tuple)):
return {"pair": input_data}
if isinstance(input_data, dict):
return {"pair": input_data["pair"]}
raise ValueError("Invalid input data.")
def __hash__(self):
return hash(tuple(sorted([self.pair[0].name, self.pair[1].name])))
def __eq__(self, other):
if isinstance(other, self.__class__):
return tuple(sorted([self.pair[0].name, self.pair[1].name])) == tuple(
sorted([other.pair[0].name, other.pair[1].name])
)
return False
def __str__(self):
return ",".join(sorted([self.pair[0].name, self.pair[1].name]))
class SnappyBody(EntityBase):
"""
Represents a group of faces forming a body for snappyHexMesh.
Bodies and their regions are defined in the ASCII STL file by using the solid -> endsolid"
keywords with a body::region naming scheme.
"""
private_attribute_entity_type_name: Literal["SnappyBody"] = pd.Field("SnappyBody", frozen=True)
private_attribute_id: str = pd.Field(
default_factory=generate_uuid,
frozen=True,
description="Unique identifier for the entity. Used by front end to track entities and enable auto update etc.",
)
surfaces: List[Surface] = pd.Field()
def __getitem__(self, key: str):
if len(self.surfaces) == 1 and ("::" not in self.surfaces[0].name):
regex = _naming_pattern_handler(pattern=key)
else:
regex = _naming_pattern_handler(pattern=f"{self.name}::{key}")
matched_surfaces = [entity for entity in self.surfaces if regex.match(entity.name)]
if not matched_surfaces:
raise KeyError(
f"No entity found in registry for parent entity: {self.name} with given name/naming pattern: '{key}'."
)
return matched_surfaces
[docs]
@final
class SeedpointVolume(_VolumeEntityBase):
"""
Represents a separate zone in the mesh, defined by a point inside it.
To be used only with snappyHexMesh.
"""
# pylint: disable=no-member
private_attribute_entity_type_name: Literal["SeedpointVolume"] = pd.Field(
"SeedpointVolume", frozen=True
)
type: Literal["SeedpointVolume"] = pd.Field("SeedpointVolume", frozen=True)
point_in_mesh: LengthType.Point = pd.Field(
description="Seedpoint for a main fluid zone in snappyHexMesh."
)
axes: Optional[OrthogonalAxes] = pd.Field(
None, description="Principal axes definition when using with PorousMedium"
) # Porous media support
axis: Optional[Axis] = pd.Field(None) # Rotation support
center: Optional[LengthType.Point] = pd.Field(None, description="") # Rotation support
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
def _per_entity_type_validation(self, param_info: ParamsValidationInfo):
"""Validate that SeedpointVolume is listed in meshing->volume_zones."""
if self.name not in param_info.to_be_generated_custom_volumes:
raise ValueError(
f"SeedpointVolume {self.name} is not listed under meshing->volume_zones(or zones)"
"->CustomZones."
)
return self
VolumeEntityTypes = Union[GenericVolume, Cylinder, Box, str]
class SurfacePair(SurfacePairBase):
"""
Represents a pair of surfaces.
Attributes:
pair (Tuple[Surface, Surface]): A tuple containing two Surface objects representing the pair.
"""
pair: Tuple[Surface, Surface]
class GhostSurfacePair(SurfacePairBase):
"""
Represents a pair of ghost surfaces.
Attributes:
pair (Tuple[GhostSurfaceType, GhostSurfaceType]):
A tuple containing two GhostSurfaceType objects representing the pair.
GhostSurface is for Python API, GhostCircularPlane is for Web UI.
"""
GhostSurfaceType: ClassVar[type] = Annotated[
Union[GhostSurface, GhostCircularPlane],
pd.Field(discriminator="private_attribute_entity_type_name"),
]
pair: Tuple[GhostSurfaceType, GhostSurfaceType]
[docs]
@final
class CustomVolume(_VolumeEntityBase):
"""
CustomVolume is a volume zone defined by its surrounding surfaces.
It will be generated by the volume mesher.
"""
private_attribute_entity_type_name: Literal["CustomVolume"] = pd.Field(
"CustomVolume", frozen=True
)
boundaries: EntityList[Surface, WindTunnelGhostSurface] = pd.Field(
description="The surfaces that define the boundaries of the custom volume."
)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
axes: Optional[OrthogonalAxes] = pd.Field(None, description="") # Porous media support
axis: Optional[Axis] = pd.Field(None) # Rotation support
# pylint: disable=no-member
center: Optional[LengthType.Point] = pd.Field(None, description="") # Rotation support
@contextual_field_validator("boundaries", mode="after")
@classmethod
def ensure_unique_boundary_names(cls, v, param_info: ParamsValidationInfo):
"""Check if the boundaries have different names within a CustomVolume."""
expanded = param_info.expand_entity_list(v)
if len(expanded) != len({boundary.name for boundary in expanded}):
raise ValueError("The boundaries of a CustomVolume must have different names.")
return v
@contextual_model_validator(mode="after")
def ensure_beta_mesher_and_compatible_farfield(self, param_info: ParamsValidationInfo):
"""Check if the beta mesher is enabled and that the user is using user-defined or wind tunnel farfield."""
if param_info.is_beta_mesher and param_info.farfield_method in (
"user-defined",
"wind-tunnel",
):
return self
raise ValueError(
"CustomVolume is supported only when the beta mesher is enabled "
"and either a user-defined farfield or a wind tunnel farfield is enabled."
)
def _apply_transformation(self, matrix: np.ndarray) -> "CustomVolume":
"""Apply rotation from transformation matrix to axes only (no translation or scaling)."""
if self.axes is None:
# No axes to transform
return self
# Extract pure rotation matrix (ignore translation and scaling)
rotation_matrix = _extract_rotation_matrix(matrix)
# Rotate both axes
x_axis_array = np.asarray(self.axes[0])
y_axis_array = np.asarray(self.axes[1])
new_x_axis = rotation_matrix @ x_axis_array
new_y_axis = rotation_matrix @ y_axis_array
new_axes = (tuple(new_x_axis), tuple(new_y_axis))
return self.model_copy(update={"axes": new_axes})
def _per_entity_type_validation(self, param_info: ParamsValidationInfo):
"""Validate that CustomVolume is listed in meshing->volume_zones."""
if self.name not in param_info.to_be_generated_custom_volumes:
raise ValueError(
f"CustomVolume {self.name} is not listed under meshing->volume_zones(or zones)"
"->CustomZones."
)
return self
class _MirroredEntityBase(EntityBase, metaclass=ABCMeta):
"""
Base class for mirrored entities (MirroredSurface, MirroredGeometryBodyGroup).
Provides common validation logic for checking source entity and mirror plane existence.
"""
mirror_plane_id: str = pd.Field(description="ID of the mirror plane.")
@property
@abstractmethod
def source_entity_id_field_name(self) -> str:
"""Return the name of the field containing the source entity ID."""
@property
@abstractmethod
def source_entity_type_name(self) -> str:
"""Return the entity type name of the source entity."""
def _manual_assignment_validation(self, param_info: ParamsValidationInfo):
"""Validate that source entity and mirror plane still exist."""
# pylint: disable=import-outside-toplevel
from flow360.component.simulation.validation.validation_context import (
add_validation_warning,
)
registry = param_info.get_entity_registry()
if registry is None:
return self
# Get source entity ID using the field name from subclass
source_entity_id = getattr(self, self.source_entity_id_field_name)
# Check if source entity exists
source_entity = registry.find_by_type_name_and_id(
entity_type=self.source_entity_type_name, entity_id=source_entity_id
)
if source_entity is None:
add_validation_warning(
f"{self.__class__.__name__} '{self.name}' references non-existent source "
f"{self.source_entity_type_name.lower()} (id={source_entity_id}). "
"This entity will be removed."
)
return None
# Check if mirror plane exists
mirror_plane = registry.find_by_type_name_and_id(
entity_type="MirrorPlane", entity_id=self.mirror_plane_id
)
if mirror_plane is None:
add_validation_warning(
f"{self.__class__.__name__} '{self.name}' references non-existent mirror plane "
f"(id={self.mirror_plane_id}). This entity will be removed."
)
return None
return self
class MirroredSurface(_SurfaceEntityBase, _MirroredEntityBase):
"""
:class:`MirroredSurface` class for representing a mirrored surface.
"""
name: str = pd.Field()
surface_id: str = pd.Field(
description="ID of the original surface being mirrored.", frozen=True
)
mirror_plane_id: str = pd.Field(description="ID of the mirror plane to mirror the surface.")
private_attribute_entity_type_name: Literal["MirroredSurface"] = pd.Field(
"MirroredSurface", frozen=True
)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
# Private attribute used for draft-only bookkeeping. This must NOT affect schema or serialization.
_geometry_body_group_id: Optional[str] = pd.PrivateAttr(default=None)
@property
def source_entity_id_field_name(self) -> str:
"""Return the name of the field containing the source entity ID."""
return "surface_id"
@property
def source_entity_type_name(self) -> str:
"""Return the entity type name of the source entity."""
return "Surface"
class MirroredGeometryBodyGroup(_MirroredEntityBase):
"""
:class:`MirroredGeometryBodyGroup` class for representing a mirrored geometry body group.
"""
name: str = pd.Field()
geometry_body_group_id: str = pd.Field(description="ID of the geometry body group to mirror.")
mirror_plane_id: str = pd.Field(
description="ID of the mirror plane to mirror the geometry body group."
)
private_attribute_entity_type_name: Literal["MirroredGeometryBodyGroup"] = pd.Field(
"MirroredGeometryBodyGroup", frozen=True
)
private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True)
@property
def source_entity_id_field_name(self) -> str:
"""Return the name of the field containing the source entity ID."""
return "geometry_body_group_id"
@property
def source_entity_type_name(self) -> str:
"""Return the entity type name of the source entity."""
return "GeometryBodyGroup"