Source code for flow360.component.simulation.primitives

# pylint: disable=too-many-lines
"""
Primitive type definitions for simulation entities.
"""

import re
from abc import ABCMeta, abstractmethod
from typing import Annotated, ClassVar, List, Literal, Optional, Tuple, Union, final

import numpy as np
import pydantic as pd
from pydantic import PositiveFloat
from typing_extensions import Self

import flow360.component.simulation.units as u
from flow360.component.simulation.entity_operation import (
    _extract_rotation_matrix,
    _extract_scale_from_matrix,
    _is_uniform_scale,
    _rotation_matrix_to_axis_angle,
    _transform_direction,
    _transform_point,
    rotation_matrix_from_axis_and_angle,
)
from flow360.component.simulation.framework.base_model import Flow360BaseModel
from flow360.component.simulation.framework.entity_base import EntityBase, EntityList
from flow360.component.simulation.framework.entity_utils import generate_uuid
from flow360.component.simulation.framework.multi_constructor_model_base import (
    MultiConstructorBaseModel,
)
from flow360.component.simulation.framework.unique_list import UniqueStringList
from flow360.component.simulation.unit_system import AngleType, AreaType, LengthType
from flow360.component.simulation.user_code.core.types import ValueOrExpression
from flow360.component.simulation.utils import BoundingBoxType, model_attribute_unlock
from flow360.component.simulation.validation.validation_context import (
    ParamsValidationInfo,
    contextual_field_validator,
    contextual_model_validator,
)
from flow360.component.types import Axis
from flow360.component.utils import _naming_pattern_handler
from flow360.exceptions import Flow360DeprecationError, Flow360ValueError

BOUNDARY_FULL_NAME_WHEN_NOT_FOUND = "This boundary does not exist!!!"


def _get_generated_boundary_names(surface_name: str, volume_mesh_meta: dict[str, dict]) -> list:
    """

    Returns all the boundaries that are eventually generated by name matching in the volume mesh metadata.

    May return multiple boundaries when the original one is split into multiple boundaries.

    """
    full_boundary_names = []

    for zone_name, zone_meta in volume_mesh_meta["zones"].items():
        for existing_boundary_name in zone_meta["boundaryNames"]:
            pattern = re.escape(zone_name) + r"/(.*)"
            match = re.search(pattern, existing_boundary_name)
            if (
                match is not None and match.group(1) == surface_name
            ) or existing_boundary_name == surface_name:
                full_boundary_names.append(existing_boundary_name)

    # Not found
    if not full_boundary_names:
        return [BOUNDARY_FULL_NAME_WHEN_NOT_FOUND]

    return full_boundary_names


def _check_axis_is_orthogonal(axis_pair: Tuple[Axis, Axis]) -> Tuple[Axis, Axis]:
    axis_1, axis_2 = np.array(axis_pair[0]), np.array(axis_pair[1])
    dot_product = np.dot(axis_1, axis_2)
    if not np.isclose(dot_product, 0, atol=1e-3):
        raise ValueError(f"The two axes are not orthogonal, dot product is {dot_product}.")
    axis_2 -= dot_product * axis_1
    axis_2 /= np.linalg.norm(axis_2)
    return (tuple(axis_1), tuple(axis_2))


def _auto_symmetric_plane_exists_from_bbox(
    *,
    global_bounding_box: BoundingBoxType,
    planar_face_tolerance: float,
) -> bool:
    """
    Determine whether automated farfield logic will generate a `symmetric` plane
    from global bounding box extents and planar-face tolerance.
    """

    y_min = global_bounding_box[0][1]
    y_max = global_bounding_box[1][1]
    tolerance = global_bounding_box.largest_dimension * planar_face_tolerance

    positive_half = abs(y_min) < tolerance < y_max
    negative_half = abs(y_max) < tolerance and y_min < -tolerance

    return positive_half or negative_half


OrthogonalAxes = Annotated[Tuple[Axis, Axis], pd.AfterValidator(_check_axis_is_orthogonal)]


[docs] class ReferenceGeometry(Flow360BaseModel): """ :class:`ReferenceGeometry` class contains all geometrical related reference values. Example ------- >>> ReferenceGeometry( ... moment_center=(1, 2, 1) * u.m, ... moment_length=(1, 1, 1) * u.m, ... area=1.5 * u.m**2 ... ) >>> ReferenceGeometry( ... moment_center=(1, 2, 1) * u.m, ... moment_length=1 * u.m, ... area=1.5 * u.m**2 ... ) # Equivalent to above ==== """ # pylint: disable=no-member moment_center: Optional[LengthType.Point] = pd.Field( None, description="The x, y, z coordinate of moment center." ) moment_length: Optional[Union[LengthType.Positive, LengthType.PositiveVector]] = pd.Field( None, description="The x, y, z component-wise moment reference lengths." ) area: Optional[ValueOrExpression[AreaType.Positive]] = pd.Field( None, description="The reference area of the geometry." ) private_attribute_area_settings: Optional[dict] = pd.Field(None)
[docs] @classmethod def fill_defaults(cls, ref, params): # type: ignore[override] """Return a new ReferenceGeometry with defaults filled using SimulationParams. Defaults when missing or when ref is None: - area: 1 * (base_length)**2 - moment_center: (0,0,0) * base_length - moment_length: (1,1,1) * base_length """ # Note: # This helper avoids scattering default logic; consumers can always call this # to obtain a fully-specified reference geometry in solver units. # `params.base_length` provides the length unit for the project. # Determine base length unit from params base_length_unit = params.base_length # LengthType quantity # Start from provided or empty if ref is None: ref = cls() # Compose output using provided values when available area = ref.area if area is None: area = 1.0 * (base_length_unit**2) moment_center = ref.moment_center if moment_center is None: moment_center = (0, 0, 0) * base_length_unit moment_length = ref.moment_length if moment_length is None: moment_length = (1.0, 1.0, 1.0) * base_length_unit return cls(area=area, moment_center=moment_center, moment_length=moment_length)
class GeometryBodyGroup(EntityBase): """ :class:`GeometryBodyGroup` represents a collection of bodies that are grouped for meshing and coordinate-system-based transformation. """ private_attribute_tag_key: str = pd.Field( description="The tag/attribute string used to group bodies.", ) private_attribute_entity_type_name: Literal["GeometryBodyGroup"] = pd.Field( "GeometryBodyGroup", frozen=True ) private_attribute_sub_components: List[str] = pd.Field( description="A list of body IDs which constitutes the current body group" ) private_attribute_color: Optional[str] = pd.Field( None, description="Color used for visualization" ) mesh_exterior: bool = pd.Field( True, description="Option to define whether to mesh exterior or interior of body group in geometry AI." "Note that this is a beta feature and the interface might change in future releases.", ) @property def transformation(self): """Deprecated property.""" raise Flow360DeprecationError( "GeometryBodyGroup.transformation is deprecated and has been removed. " "Please use CoordinateSystem for transformations instead." ) @transformation.setter def transformation(self, value): """Deprecated property setter.""" raise Flow360DeprecationError( "GeometryBodyGroup.transformation is deprecated and has been removed. " "Please use CoordinateSystem for transformations instead." ) class _VolumeEntityBase(EntityBase, metaclass=ABCMeta): """All volumetric entities should inherit from this class.""" private_attribute_zone_boundary_names: UniqueStringList = pd.Field( UniqueStringList(), frozen=True, description="Boundary names of the zone WITH the prepending zone name.", ) private_attribute_full_name: Optional[str] = pd.Field(None, frozen=True) def _is_volume_zone(self) -> bool: """This is not a zone if zone boundaries are not defined. For validation usage.""" return self.private_attribute_zone_boundary_names is not None def _update_entity_info_with_metadata(self, volume_mesh_meta_data: dict[str, dict]) -> None: """ Update the full name of zones once the volume mesh is done. e.g. rotating_cylinder --> rotatingBlock-rotating_cylinder """ entity_name = self.name for zone_full_name, zone_meta in volume_mesh_meta_data["zones"].items(): pattern = r"rotatingBlock-" + re.escape(entity_name) if entity_name == "__farfield_zone_name_not_properly_set_yet": # We have hardcoded name for farfield zone. pattern = r"stationaryBlock|fluid" match = re.search(pattern, zone_full_name) if match is not None or entity_name == zone_full_name: with model_attribute_unlock(self, "private_attribute_full_name"): self.private_attribute_full_name = zone_full_name with model_attribute_unlock(self, "private_attribute_zone_boundary_names"): self.private_attribute_zone_boundary_names = UniqueStringList( items=zone_meta["boundaryNames"] ) break @property def full_name(self): """Gets the full name which includes the zone name""" if self.private_attribute_full_name is None: return self.name return self.private_attribute_full_name class _SurfaceEntityBase(EntityBase, metaclass=ABCMeta): private_attribute_full_name: Optional[str] = pd.Field(None, frozen=True) def _update_entity_info_with_metadata(self, volume_mesh_meta_data: dict) -> None: """ Update parent zone name once the volume mesh is done. """ updated_boundary_names = _get_generated_boundary_names(self.name, volume_mesh_meta_data) with model_attribute_unlock(self, "private_attribute_full_name"): self.private_attribute_full_name = updated_boundary_names.pop(0) multiplication_result = [] for new_boundary_name in updated_boundary_names: multiplication_result.append( self.copy( update={ "name": new_boundary_name, "private_attribute_full_name": new_boundary_name, } ) ) return multiplication_result if multiplication_result else None @property def full_name(self): """Gets the full name which includes the zone name""" if self.private_attribute_full_name is None: return self.name return self.private_attribute_full_name @final class Edge(EntityBase): """ Edge which contains a set of grouped edges from geometry. """ private_attribute_entity_type_name: Literal["Edge"] = pd.Field("Edge", frozen=True) private_attribute_tag_key: Optional[str] = pd.Field( None, description="The tag/attribute string used to group geometry edges to form this `Edge`.", ) private_attribute_sub_components: Optional[List[str]] = pd.Field( [], description="The edge ids in geometry that composed into this `Edge`." ) @final class GenericVolume(_VolumeEntityBase): """ Do not expose. This type of entity will get auto-constructed by assets when loading metadata. By design these GenericVolume entities should only contain basic connectivity/mesh information. These can only come from uploaded volume mesh. """ private_attribute_entity_type_name: Literal["GenericVolume"] = pd.Field( "GenericVolume", frozen=True ) axes: Optional[OrthogonalAxes] = pd.Field(None, description="") # Porous media support axis: Optional[Axis] = pd.Field(None) # Rotation support # pylint: disable=no-member center: Optional[LengthType.Point] = pd.Field(None, description="") # Rotation support class BoxCache(Flow360BaseModel): """BoxCache""" # `axes` will always exist as it needs to be used. So `axes` is more like a storage than input cache. axes: Optional[OrthogonalAxes] = pd.Field(None) # pylint: disable=no-member center: Optional[LengthType.Point] = pd.Field(None) size: Optional[LengthType.PositiveVector] = pd.Field(None) name: Optional[str] = pd.Field(None)
[docs] @final class Box(MultiConstructorBaseModel, _VolumeEntityBase): """ :class:`Box` class represents a box in three-dimensional space. Example ------- >>> fl.Box( ... name="box", ... axis_of_rotation = (1, 0, 0), ... angle_of_rotation = 45 * fl.u.deg, ... center = (1, 1, 1) * fl.u.m, ... size=(0.2, 0.3, 2) * fl.u.m, ... ) Define a box using principal axes: >>> fl.Box.from_principal_axes( ... name="box", ... axes=[(0, 1, 0), (0, 0, 1)], ... center=(0, 0, 0) * fl.u.m, ... size=(0.2, 0.3, 2) * fl.u.m, ... ) ==== """ type_name: Literal["Box"] = pd.Field("Box", frozen=True) # pylint: disable=no-member center: LengthType.Point = pd.Field(description="The coordinates of the center of the box.") size: LengthType.PositiveVector = pd.Field( description="The dimensions of the box (length, width, height)." ) axis_of_rotation: Axis = pd.Field( default=(0, 0, 1), description="The rotation axis. Cannot change once specified.", frozen=True, ) angle_of_rotation: AngleType = pd.Field( default=0 * u.degree, description="The rotation angle. Cannot change once specified.", frozen=True, ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) private_attribute_input_cache: BoxCache = pd.Field(BoxCache(), frozen=True) private_attribute_entity_type_name: Literal["Box"] = pd.Field("Box", frozen=True) # pylint: disable=no-self-argument
[docs] @MultiConstructorBaseModel.model_constructor @pd.validate_call def from_principal_axes( cls, name: str, center: LengthType.Point, size: LengthType.PositiveVector, axes: OrthogonalAxes, ): """ Construct box from principal axes """ # validate x_axis, y_axis = np.array(axes[0]), np.array(axes[1]) z_axis = np.cross(x_axis, y_axis) rotation_matrix = np.transpose(np.asarray([x_axis, y_axis, z_axis], dtype=np.float64)) # Calculate the rotation axis n using numpy instead of scipy eigvals, eigvecs = np.linalg.eig(rotation_matrix) axis = np.real(eigvecs[:, np.where(np.isreal(eigvals))]) if axis.shape[2] > 1: # in case of 0 rotation angle axis = axis[:, :, 0] axis = np.ndarray.flatten(axis) angle = np.sum(abs(np.angle(eigvals))) / 2 # Find correct angle matrix_test = rotation_matrix_from_axis_and_angle(axis, angle) angle *= -1 if np.isclose(rotation_matrix[0, :] @ matrix_test[:, 0], 1) else 1 # pylint: disable=not-callable return cls( name=name, center=center, size=size, axis_of_rotation=tuple(axis), angle_of_rotation=angle * u.rad, )
@pd.model_validator(mode="after") def _convert_axis_and_angle_to_coordinate_axes(self) -> Self: """ Converts the Box object's axis and angle orientation information to a coordinate axes representation. """ # Ensure the axis is a numpy array if not self.private_attribute_input_cache.axes: axis = np.asarray(self.axis_of_rotation, dtype=np.float64) angle = self.angle_of_rotation.to("rad").v.item() # Normalize the axis vector axis = axis / np.linalg.norm(axis) rotation_matrix = rotation_matrix_from_axis_and_angle(axis, angle) # pylint: disable=assigning-non-slot self.private_attribute_input_cache.axes = np.transpose(rotation_matrix[:, :2]).tolist() return self @property def axes(self): """Return the axes that the box is aligned with.""" return self.private_attribute_input_cache.axes @pd.field_validator("center", "size", mode="after") @classmethod def _update_input_cache(cls, value, info: pd.ValidationInfo): setattr(info.data["private_attribute_input_cache"], info.field_name, value) return value def _apply_transformation(self, matrix: np.ndarray) -> "Box": """Apply 3x4 transformation matrix with uniform scale validation and rotation composition.""" # Validate uniform scaling if not _is_uniform_scale(matrix): scale_factors = _extract_scale_from_matrix(matrix) raise Flow360ValueError( f"Box only supports uniform scaling. " f"Detected scale factors: {scale_factors}" ) # Extract uniform scale factor uniform_scale = _extract_scale_from_matrix(matrix)[0] # Transform center center_array = np.asarray(self.center.value) new_center_array = _transform_point(center_array, matrix) new_center = type(self.center)(new_center_array, self.center.units) # Combine rotations: existing rotation + transformation rotation # Step 1: Get existing rotation matrix from axis-angle existing_axis = np.asarray(self.axis_of_rotation, dtype=np.float64) existing_axis = existing_axis / np.linalg.norm(existing_axis) existing_angle = self.angle_of_rotation.to("rad").v.item() rot_existing = rotation_matrix_from_axis_and_angle(existing_axis, existing_angle) # Step 2: Extract pure rotation from transformation matrix rot_transform = _extract_rotation_matrix(matrix) # Step 3: Combine rotations (apply transformation rotation to existing) rot_combined = rot_transform @ rot_existing # Step 4: Extract new axis-angle from combined rotation new_axis, new_angle = _rotation_matrix_to_axis_angle(rot_combined) # Scale size uniformly new_size = self.size * uniform_scale return self.model_copy( update={ "center": new_center, "axis_of_rotation": tuple(new_axis), "angle_of_rotation": new_angle * u.rad, "size": new_size, } )
[docs] @final class Cylinder(_VolumeEntityBase): """ :class:`Cylinder` class represents a cylinder in three-dimensional space. Example ------- >>> fl.Cylinder( ... name="bet_disk_volume", ... center=(0, 0, 0) * fl.u.inch, ... axis=(0, 0, 1), ... outer_radius=150 * fl.u.inch, ... height=15 * fl.u.inch, ... ) ==== """ private_attribute_entity_type_name: Literal["Cylinder"] = pd.Field("Cylinder", frozen=True) axis: Axis = pd.Field(description="The axis of the cylinder.") # pylint: disable=no-member center: LengthType.Point = pd.Field(description="The center point of the cylinder.") height: LengthType.Positive = pd.Field(description="The height of the cylinder.") inner_radius: Optional[LengthType.NonNegative] = pd.Field( 0 * u.m, description="The inner radius of the cylinder." ) outer_radius: LengthType.Positive = pd.Field(description="The outer radius of the cylinder.") private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) @pd.model_validator(mode="after") def _check_inner_radius_is_less_than_outer_radius(self) -> Self: if self.inner_radius is not None and self.inner_radius >= self.outer_radius: raise ValueError( f"Cylinder inner radius ({self.inner_radius}) must be less than outer radius ({self.outer_radius})." ) return self def _apply_transformation(self, matrix: np.ndarray) -> "Cylinder": """Apply 3x4 transformation matrix with uniform scale validation.""" # Validate uniform scaling if not _is_uniform_scale(matrix): scale_factors = _extract_scale_from_matrix(matrix) raise Flow360ValueError( f"Cylinder only supports uniform scaling. " f"Detected scale factors: {scale_factors}" ) # Extract uniform scale factor uniform_scale = _extract_scale_from_matrix(matrix)[0] # Transform center center_array = np.asarray(self.center.value) new_center_array = _transform_point(center_array, matrix) new_center = type(self.center)(new_center_array, self.center.units) # Rotate axis axis_array = np.asarray(self.axis) transformed_axis = _transform_direction(axis_array, matrix) new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis)) # Scale dimensions uniformly new_height = self.height * uniform_scale new_outer_radius = self.outer_radius * uniform_scale new_inner_radius = ( self.inner_radius * uniform_scale if self.inner_radius is not None else None ) return self.model_copy( update={ "center": new_center, "axis": new_axis, "height": new_height, "outer_radius": new_outer_radius, "inner_radius": new_inner_radius, } )
[docs] @final class AxisymmetricBody(_VolumeEntityBase): """ :class:`AxisymmetricBody` class represents a generic body of revolution in three-dimensional space, represented as a list[(Axial Position, Radial Extent)] profile polyline with arbitrary center and axial direction. Expect first and last profile samples to connect to axis, i.e., have radius = 0. Example ------- >>> fl.AxisymmetricBody( ... name="cone_frustum_body", ... center=(0, 0, 0) * fl.u.inch, ... axis=(0, 0, 1), ... profile_curve = [(-1, 0) * fl.u.inch, (-1, 1) * fl.u.inch, (1, 2) * fl.u.inch, (1, 0) * fl.u.inch] ... ) ==== """ private_attribute_entity_type_name: Literal["AxisymmetricBody"] = pd.Field( "AxisymmetricBody", frozen=True ) axis: Axis = pd.Field(description="The axis of the body of revolution.") # pylint: disable=no-member center: LengthType.Point = pd.Field(description="The center point of the body of revolution.") profile_curve: List[LengthType.Pair] = pd.Field( description="The (Axial, Radial) profile of the body of revolution." ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) @pd.field_validator("profile_curve", mode="after") @classmethod def _check_radial_profile_is_positive(cls, curve): first_point = curve[0] if first_point[1] != 0: raise ValueError( f"Expect first profile sample to be (Axial, 0.0). Found invalid point: {str(first_point)}." ) last_point = curve[-1] if last_point[1] != 0: raise ValueError( f"Expect last profile sample to be (Axial, 0.0). Found invalid point: {str(last_point)}." ) for profile_point in curve[1:-1]: if profile_point[1] < 0: raise ValueError( f"Expect profile samples to be (Axial, Radial) samples with positive Radial." f" Found invalid point: {str(profile_point)}." ) return curve def _apply_transformation(self, matrix: np.ndarray) -> "AxisymmetricBody": """Apply 3x4 transformation matrix with uniform scale validation.""" # Validate uniform scaling if not _is_uniform_scale(matrix): scale_factors = _extract_scale_from_matrix(matrix) raise Flow360ValueError( f"AxisymmetricBody only supports uniform scaling. " f"Detected scale factors: {scale_factors}" ) # Extract uniform scale factor uniform_scale = _extract_scale_from_matrix(matrix)[0] # Transform center center_array = np.asarray(self.center.value) new_center_array = _transform_point(center_array, matrix) new_center = type(self.center)(new_center_array, self.center.units) # Rotate axis axis_array = np.asarray(self.axis) transformed_axis = _transform_direction(axis_array, matrix) new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis)) # Scale profile curve uniformly new_profile_curve = [] for point in self.profile_curve: point_array = np.asarray(point.value) scaled_point_array = point_array * uniform_scale new_profile_curve.append(type(point)(scaled_point_array, point.units)) return self.model_copy( update={ "center": new_center, "axis": new_axis, "profile_curve": new_profile_curve, } )
class SurfacePrivateAttributes(Flow360BaseModel): """ Private attributes for Surface. TODO: With the amount of private_attribute prefixes we have TODO: Maybe it makes more sense to lump them together to save storage space? """ type_name: Literal["SurfacePrivateAttributes"] = pd.Field( "SurfacePrivateAttributes", frozen=True ) bounding_box: BoundingBoxType = pd.Field(description="Bounding box of the surface.") @final class Surface(_SurfaceEntityBase): """ :class:`Surface` represents a boundary surface in three-dimensional space. """ private_attribute_entity_type_name: Literal["Surface"] = pd.Field("Surface", frozen=True) private_attribute_is_interface: Optional[bool] = pd.Field( None, frozen=True, description="This is required when generated from volume mesh " + "but not required when from surface mesh meta.", ) private_attribute_tag_key: Optional[str] = pd.Field( None, description="The tag/attribute string used to group geometry faces to form this `Surface`.", ) private_attribute_sub_components: Optional[List[str]] = pd.Field( [], description="The face ids in geometry that composed into this `Surface`." ) private_attribute_color: Optional[str] = pd.Field( None, description="Color used for visualization" ) private_attributes: Optional[SurfacePrivateAttributes] = pd.Field(None) # Note: private_attribute_id should not be `Optional` anymore. # B.C. Updater and geometry pipeline will populate it. def _overlaps(self, ghost_surface_center_y: Optional[float], length_tolerance: float) -> bool: if self.private_attributes is None: # Legacy cloud asset. return False # pylint: disable=no-member my_bounding_box = self.private_attributes.bounding_box if abs(my_bounding_box.ymax - ghost_surface_center_y) > length_tolerance: return False if abs(my_bounding_box.ymin - ghost_surface_center_y) > length_tolerance: return False return True def _will_be_deleted_by_mesher( # pylint: disable=too-many-arguments, too-many-return-statements, too-many-branches self, entity_transformation_detected: bool, farfield_method: Optional[ Literal["auto", "quasi-3d", "quasi-3d-periodic", "user-defined", "wind-tunnel"] ], global_bounding_box: Optional[BoundingBoxType], planar_face_tolerance: Optional[float], half_model_symmetry_plane_center_y: Optional[float], quasi_3d_symmetry_planes_center_y: Optional[tuple[float]], farfield_domain_type: Optional[str] = None, ) -> bool: """ Check against the automated farfield method and determine if the current `Surface` will be deleted by the mesher. """ if entity_transformation_detected: # If transformed then the following check will no longer be accurate # since we do not know the final bounding box for each surface and global model. return False if global_bounding_box is None or planar_face_tolerance is None or farfield_method is None: # VolumeMesh or Geometry/SurfaceMesh with legacy schema. return False length_tolerance = global_bounding_box.largest_dimension * planar_face_tolerance if farfield_domain_type in ("half_body_positive_y", "half_body_negative_y"): if self.private_attributes is not None: # pylint: disable=no-member y_min = self.private_attributes.bounding_box.ymin y_max = self.private_attributes.bounding_box.ymax if farfield_domain_type == "half_body_positive_y" and y_max < -length_tolerance: return True if farfield_domain_type == "half_body_negative_y" and y_min > length_tolerance: return True if farfield_method in ("user-defined", "wind-tunnel"): # Not applicable to user defined or wind tunnel farfield return False if farfield_method == "auto": if half_model_symmetry_plane_center_y is None: # Legacy schema. return False if farfield_domain_type not in ("half_body_positive_y", "half_body_negative_y") and ( not _auto_symmetric_plane_exists_from_bbox( global_bounding_box=global_bounding_box, planar_face_tolerance=planar_face_tolerance, ) ): return False return self._overlaps(half_model_symmetry_plane_center_y, length_tolerance) if farfield_method in ("quasi-3d", "quasi-3d-periodic"): if quasi_3d_symmetry_planes_center_y is None: # Legacy schema. return False for plane_center_y in quasi_3d_symmetry_planes_center_y: if self._overlaps(plane_center_y, length_tolerance): return True return False raise ValueError(f"Unknown auto farfield generation method: {farfield_method}.") @final class ImportedSurface(EntityBase): """ImportedSurface for post-processing""" private_attribute_entity_type_name: Literal["ImportedSurface"] = pd.Field( "ImportedSurface", frozen=True ) private_attribute_sub_components: Optional[List[str]] = pd.Field( None, description="A list of sub components" ) file_name: Optional[str] = None surface_mesh_id: Optional[str] = None @pd.model_validator(mode="after") def _populate_id_from_name(self) -> "ImportedSurface": """Ensure a deterministic private_attribute_id exists. CoordinateSystemManager and MirrorManager use private_attribute_id as a dict key for entity tracking. A deterministic id derived from name guarantees the same ImportedSurface always resolves to the same id, even when reconstructed from cloud metadata across sessions. """ if self.private_attribute_id is None: object.__setattr__(self, "private_attribute_id", f"{self.name}_defaultBody") return self class GhostSurface(_SurfaceEntityBase): """ Represents a boundary surface that may or may not be generated therefore may or may not exist. It depends on the submitted geometry/Surface mesh. E.g. the symmetry plane in `AutomatedFarfield`. This is a token/place-holder used only on the Python API side. All `GhostSurface` entities will be replaced with exact entity instances before simulation.json submission. """ name: str = pd.Field(frozen=True) private_attribute_entity_type_name: Literal["GhostSurface"] = pd.Field( "GhostSurface", frozen=True ) class WindTunnelGhostSurface(GhostSurface): """Wind tunnel boundary patches.""" private_attribute_entity_type_name: Literal["WindTunnelGhostSurface"] = pd.Field( "WindTunnelGhostSurface", frozen=True ) # For frontend: list of floor types that use this boundary patch, or ["all"] used_by: List[ Literal["StaticFloor", "FullyMovingFloor", "CentralBelt", "WheelBelts", "all"] ] = pd.Field(default_factory=lambda: ["all"], frozen=True) def exists(self, _) -> bool: """Currently, .exists() is only called on automated farfield""" raise ValueError(".exists should not be called on wind tunnel farfield") # pylint: disable=missing-class-docstring @final class GhostSphere(_SurfaceEntityBase): private_attribute_entity_type_name: Literal["GhostSphere"] = pd.Field( "GhostSphere", frozen=True ) # Note: Making following optional since front end will not carry these over to assigned entities. center: Optional[List] = pd.Field(None, alias="center") max_radius: Optional[PositiveFloat] = pd.Field(None, alias="maxRadius") def exists(self, _) -> bool: """Ghost farfield always exists.""" return True # pylint: disable=missing-class-docstring @final class GhostCircularPlane(_SurfaceEntityBase): private_attribute_entity_type_name: Literal["GhostCircularPlane"] = pd.Field( "GhostCircularPlane", frozen=True ) # Note: Making following optional since front end will not carry these over to assigned entities. center: Optional[List] = pd.Field(None, alias="center") max_radius: Optional[PositiveFloat] = pd.Field(None, alias="maxRadius") normal_axis: Optional[List] = pd.Field(None, alias="normalAxis") def _get_existence_dependency(self, validation_info): y_max = validation_info.global_bounding_box[1][1] y_min = validation_info.global_bounding_box[0][1] largest_dimension = -np.inf for dim in range(3): dimension = ( validation_info.global_bounding_box[1][dim] - validation_info.global_bounding_box[0][dim] ) largest_dimension = max(largest_dimension, dimension) tolerance = largest_dimension * validation_info.planar_face_tolerance return y_min, y_max, tolerance, largest_dimension def exists(self, validation_info) -> bool: """For automated farfield, check mesher logic for symmetric plane existence.""" if self.name != "symmetric": # Quasi-3D mode, no need to check existence. return True if validation_info is None: raise ValueError("Validation info is required for GhostCircularPlane existence check.") if validation_info.global_bounding_box is None: # This likely means the user try to use mesher on old cloud resources. # We cannot validate if symmetric exists so will let it pass. Pipeline will error out anyway. return True if validation_info.will_generate_forced_symmetry_plane(): return True return _auto_symmetric_plane_exists_from_bbox( global_bounding_box=validation_info.global_bounding_box, planar_face_tolerance=validation_info.planar_face_tolerance, ) def _per_entity_type_validation(self, param_info: ParamsValidationInfo): """Validate ghost surface existence and configuration.""" # pylint: disable=import-outside-toplevel from flow360.component.simulation.validation.validation_utils import ( check_symmetric_boundary_existence, check_user_defined_farfield_symmetry_existence, ) # These functions expect a list, so wrap self check_user_defined_farfield_symmetry_existence([self], param_info) check_symmetric_boundary_existence([self], param_info) return self class SurfacePairBase(Flow360BaseModel): """ Base class for surface pair objects. Subclasses must define a `pair` attribute with the appropriate surface type. """ pair: Tuple[_SurfaceEntityBase, _SurfaceEntityBase] @pd.field_validator("pair", mode="after") @classmethod def check_unique(cls, v): """Check if pairing with self.""" if v[0].name == v[1].name: raise ValueError("A surface cannot be paired with itself.") return v @pd.model_validator(mode="before") @classmethod def _format_input(cls, input_data: Union[dict, list, tuple]): if isinstance(input_data, (list, tuple)): return {"pair": input_data} if isinstance(input_data, dict): return {"pair": input_data["pair"]} raise ValueError("Invalid input data.") def __hash__(self): return hash(tuple(sorted([self.pair[0].name, self.pair[1].name]))) def __eq__(self, other): if isinstance(other, self.__class__): return tuple(sorted([self.pair[0].name, self.pair[1].name])) == tuple( sorted([other.pair[0].name, other.pair[1].name]) ) return False def __str__(self): return ",".join(sorted([self.pair[0].name, self.pair[1].name])) class SnappyBody(EntityBase): """ Represents a group of faces forming a body for snappyHexMesh. Bodies and their regions are defined in the ASCII STL file by using the solid -> endsolid" keywords with a body::region naming scheme. """ private_attribute_entity_type_name: Literal["SnappyBody"] = pd.Field("SnappyBody", frozen=True) private_attribute_id: str = pd.Field( default_factory=generate_uuid, frozen=True, description="Unique identifier for the entity. Used by front end to track entities and enable auto update etc.", ) surfaces: List[Surface] = pd.Field() def __getitem__(self, key: str): if len(self.surfaces) == 1 and ("::" not in self.surfaces[0].name): regex = _naming_pattern_handler(pattern=key) else: regex = _naming_pattern_handler(pattern=f"{self.name}::{key}") matched_surfaces = [entity for entity in self.surfaces if regex.match(entity.name)] if not matched_surfaces: raise KeyError( f"No entity found in registry for parent entity: {self.name} with given name/naming pattern: '{key}'." ) return matched_surfaces
[docs] @final class SeedpointVolume(_VolumeEntityBase): """ Represents a separate zone in the mesh, defined by a point inside it. To be used only with snappyHexMesh. """ # pylint: disable=no-member private_attribute_entity_type_name: Literal["SeedpointVolume"] = pd.Field( "SeedpointVolume", frozen=True ) type: Literal["SeedpointVolume"] = pd.Field("SeedpointVolume", frozen=True) point_in_mesh: LengthType.Point = pd.Field( description="Seedpoint for a main fluid zone in snappyHexMesh." ) axes: Optional[OrthogonalAxes] = pd.Field( None, description="Principal axes definition when using with PorousMedium" ) # Porous media support axis: Optional[Axis] = pd.Field(None) # Rotation support center: Optional[LengthType.Point] = pd.Field(None, description="") # Rotation support private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) def _per_entity_type_validation(self, param_info: ParamsValidationInfo): """Validate that SeedpointVolume is listed in meshing->volume_zones.""" if self.name not in param_info.to_be_generated_custom_volumes: raise ValueError( f"SeedpointVolume {self.name} is not listed under meshing->volume_zones(or zones)" "->CustomZones." ) return self
VolumeEntityTypes = Union[GenericVolume, Cylinder, Box, str] class SurfacePair(SurfacePairBase): """ Represents a pair of surfaces. Attributes: pair (Tuple[Surface, Surface]): A tuple containing two Surface objects representing the pair. """ pair: Tuple[Surface, Surface] class GhostSurfacePair(SurfacePairBase): """ Represents a pair of ghost surfaces. Attributes: pair (Tuple[GhostSurfaceType, GhostSurfaceType]): A tuple containing two GhostSurfaceType objects representing the pair. GhostSurface is for Python API, GhostCircularPlane is for Web UI. """ GhostSurfaceType: ClassVar[type] = Annotated[ Union[GhostSurface, GhostCircularPlane], pd.Field(discriminator="private_attribute_entity_type_name"), ] pair: Tuple[GhostSurfaceType, GhostSurfaceType]
[docs] @final class CustomVolume(_VolumeEntityBase): """ CustomVolume is a volume zone defined by its surrounding surfaces. It will be generated by the volume mesher. """ private_attribute_entity_type_name: Literal["CustomVolume"] = pd.Field( "CustomVolume", frozen=True ) boundaries: EntityList[Surface, WindTunnelGhostSurface] = pd.Field( description="The surfaces that define the boundaries of the custom volume." ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) axes: Optional[OrthogonalAxes] = pd.Field(None, description="") # Porous media support axis: Optional[Axis] = pd.Field(None) # Rotation support # pylint: disable=no-member center: Optional[LengthType.Point] = pd.Field(None, description="") # Rotation support @contextual_field_validator("boundaries", mode="after") @classmethod def ensure_unique_boundary_names(cls, v, param_info: ParamsValidationInfo): """Check if the boundaries have different names within a CustomVolume.""" expanded = param_info.expand_entity_list(v) if len(expanded) != len({boundary.name for boundary in expanded}): raise ValueError("The boundaries of a CustomVolume must have different names.") return v @contextual_model_validator(mode="after") def ensure_beta_mesher_and_compatible_farfield(self, param_info: ParamsValidationInfo): """Check if the beta mesher is enabled and that the user is using user-defined or wind tunnel farfield.""" if param_info.is_beta_mesher and param_info.farfield_method in ( "user-defined", "wind-tunnel", ): return self raise ValueError( "CustomVolume is supported only when the beta mesher is enabled " "and either a user-defined farfield or a wind tunnel farfield is enabled." ) def _apply_transformation(self, matrix: np.ndarray) -> "CustomVolume": """Apply rotation from transformation matrix to axes only (no translation or scaling).""" if self.axes is None: # No axes to transform return self # Extract pure rotation matrix (ignore translation and scaling) rotation_matrix = _extract_rotation_matrix(matrix) # Rotate both axes x_axis_array = np.asarray(self.axes[0]) y_axis_array = np.asarray(self.axes[1]) new_x_axis = rotation_matrix @ x_axis_array new_y_axis = rotation_matrix @ y_axis_array new_axes = (tuple(new_x_axis), tuple(new_y_axis)) return self.model_copy(update={"axes": new_axes}) def _per_entity_type_validation(self, param_info: ParamsValidationInfo): """Validate that CustomVolume is listed in meshing->volume_zones.""" if self.name not in param_info.to_be_generated_custom_volumes: raise ValueError( f"CustomVolume {self.name} is not listed under meshing->volume_zones(or zones)" "->CustomZones." ) return self
class _MirroredEntityBase(EntityBase, metaclass=ABCMeta): """ Base class for mirrored entities (MirroredSurface, MirroredGeometryBodyGroup). Provides common validation logic for checking source entity and mirror plane existence. """ mirror_plane_id: str = pd.Field(description="ID of the mirror plane.") @property @abstractmethod def source_entity_id_field_name(self) -> str: """Return the name of the field containing the source entity ID.""" @property @abstractmethod def source_entity_type_name(self) -> str: """Return the entity type name of the source entity.""" def _manual_assignment_validation(self, param_info: ParamsValidationInfo): """Validate that source entity and mirror plane still exist.""" # pylint: disable=import-outside-toplevel from flow360.component.simulation.validation.validation_context import ( add_validation_warning, ) registry = param_info.get_entity_registry() if registry is None: return self # Get source entity ID using the field name from subclass source_entity_id = getattr(self, self.source_entity_id_field_name) # Check if source entity exists source_entity = registry.find_by_type_name_and_id( entity_type=self.source_entity_type_name, entity_id=source_entity_id ) if source_entity is None: add_validation_warning( f"{self.__class__.__name__} '{self.name}' references non-existent source " f"{self.source_entity_type_name.lower()} (id={source_entity_id}). " "This entity will be removed." ) return None # Check if mirror plane exists mirror_plane = registry.find_by_type_name_and_id( entity_type="MirrorPlane", entity_id=self.mirror_plane_id ) if mirror_plane is None: add_validation_warning( f"{self.__class__.__name__} '{self.name}' references non-existent mirror plane " f"(id={self.mirror_plane_id}). This entity will be removed." ) return None return self class MirroredSurface(_SurfaceEntityBase, _MirroredEntityBase): """ :class:`MirroredSurface` class for representing a mirrored surface. """ name: str = pd.Field() surface_id: str = pd.Field( description="ID of the original surface being mirrored.", frozen=True ) mirror_plane_id: str = pd.Field(description="ID of the mirror plane to mirror the surface.") private_attribute_entity_type_name: Literal["MirroredSurface"] = pd.Field( "MirroredSurface", frozen=True ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) # Private attribute used for draft-only bookkeeping. This must NOT affect schema or serialization. _geometry_body_group_id: Optional[str] = pd.PrivateAttr(default=None) @property def source_entity_id_field_name(self) -> str: """Return the name of the field containing the source entity ID.""" return "surface_id" @property def source_entity_type_name(self) -> str: """Return the entity type name of the source entity.""" return "Surface" class MirroredGeometryBodyGroup(_MirroredEntityBase): """ :class:`MirroredGeometryBodyGroup` class for representing a mirrored geometry body group. """ name: str = pd.Field() geometry_body_group_id: str = pd.Field(description="ID of the geometry body group to mirror.") mirror_plane_id: str = pd.Field( description="ID of the mirror plane to mirror the geometry body group." ) private_attribute_entity_type_name: Literal["MirroredGeometryBodyGroup"] = pd.Field( "MirroredGeometryBodyGroup", frozen=True ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) @property def source_entity_id_field_name(self) -> str: """Return the name of the field containing the source entity ID.""" return "geometry_body_group_id" @property def source_entity_type_name(self) -> str: """Return the entity type name of the source entity.""" return "GeometryBodyGroup"