Source code for flow360_schema.models.entities.volume_entities

"""Volume entity definitions.

Box, VoxelGrid, Cylinder, Sphere, AxisymmetricBody, GenericVolume,
CustomVolume, SeedpointVolume.
"""

# mypy: disable-error-code="import-not-found"

from __future__ import annotations

import logging
from typing import Any, Literal, final

import pydantic as pd
import unyt
from typing_extensions import Self

from flow360_schema.framework.base_model import Flow360BaseModel
from flow360_schema.framework.entity.entity_list import EntityList
from flow360_schema.framework.entity.entity_operation import (
    _extract_rotation_matrix,
    _rotation_matrix_to_axis_angle,
    _transform_direction,
    _transform_point,
    _validate_uniform_scale_and_transform_center,
    rotation_matrix_from_axis_and_angle,
)
from flow360_schema.framework.entity.entity_utils import generate_uuid
from flow360_schema.framework.entity.geometric_types import Axis
from flow360_schema.framework.multi_constructor_model_base import MultiConstructorBaseModel
from flow360_schema.framework.physical_dimensions import Angle, Length
from flow360_schema.framework.validation.context import add_validation_warning
from flow360_schema.framework.validation.validators import (
    contextual_field_validator,
    contextual_model_validator,
)
from flow360_schema.models.simulation.framework.updater_utils import deprecation_reminder

from .base import OrthogonalAxes, _VolumeEntityBase
from .surface_entities import Surface

logger = logging.getLogger(__name__)

NDArray = Any


class BoxCache(Flow360BaseModel):
    """Cached computation results for Box orientation."""

    # `axes` always exists as it needs to be used. More like storage than input cache.
    axes: OrthogonalAxes | None = pd.Field(None)
    center: Length.Vector3 | None = pd.Field(None)  # type: ignore[valid-type]
    size: Length.PositiveVector3 | None = pd.Field(None)  # type: ignore[valid-type]
    name: str | None = pd.Field(None)


[docs] @final class Box(MultiConstructorBaseModel, _VolumeEntityBase): """ Box in three-dimensional space, defined by center, size, and rotation. Example ------- >>> Box( ... name="box", ... axis_of_rotation=(1, 0, 0), ... angle_of_rotation=45 * unyt.deg, ... center=(1, 1, 1) * unyt.m, ... size=(0.2, 0.3, 2) * unyt.m, ... ) Define a box using principal axes: >>> Box.from_principal_axes( ... name="box", ... axes=[(0, 1, 0), (0, 0, 1)], ... center=(0, 0, 0) * unyt.m, ... size=(0.2, 0.3, 2) * unyt.m, ... ) """ type_name: Literal["Box"] = pd.Field("Box", frozen=True) # type: ignore[assignment] center: Length.Vector3 = pd.Field(description="The coordinates of the center of the box.") # type: ignore[valid-type] size: Length.PositiveVector3 = pd.Field(description="The dimensions of the box (length, width, height).") # type: ignore[valid-type] axis_of_rotation: Axis = pd.Field( # type: ignore[assignment] default=(0, 0, 1), description="The rotation axis. Cannot change once specified.", frozen=True, ) angle_of_rotation: Angle.Float64 = pd.Field( # type: ignore[valid-type] default=0 * unyt.degree, description="The rotation angle. Cannot change once specified.", frozen=True, ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) private_attribute_input_cache: BoxCache = pd.Field(BoxCache(), frozen=True) # type: ignore[call-arg] private_attribute_entity_type_name: Literal["Box"] = pd.Field("Box", frozen=True)
[docs] @MultiConstructorBaseModel.model_constructor @pd.validate_call def from_principal_axes( cls, name: str, center: Length.Vector3, # type: ignore[valid-type] size: Length.PositiveVector3, # type: ignore[valid-type] axes: OrthogonalAxes, ) -> Self: """Construct box from principal axes.""" import numpy as np x_axis, y_axis = np.array(axes[0]), np.array(axes[1]) z_axis = np.cross(x_axis, y_axis) rotation_matrix = np.transpose(np.asarray([x_axis, y_axis, z_axis], dtype=np.float64)) # Calculate the rotation axis using eigenvalue decomposition eigvals, eigvecs = np.linalg.eig(rotation_matrix) axis = np.real(eigvecs[:, np.where(np.isreal(eigvals))]) if axis.shape[2] > 1: # in case of 0 rotation angle axis = axis[:, :, 0] axis = np.ndarray.flatten(axis) angle = np.sum(abs(np.angle(eigvals))) / 2 # Find correct angle sign matrix_test = rotation_matrix_from_axis_and_angle(axis, angle) angle *= -1 if np.isclose(rotation_matrix[0, :] @ matrix_test[:, 0], 1) else 1 return cls( # type: ignore[operator, no-any-return] name=name, center=center, size=size, axis_of_rotation=tuple(axis), angle_of_rotation=angle * unyt.rad, )
@pd.model_validator(mode="after") def _convert_axis_and_angle_to_coordinate_axes(self) -> Self: """Convert axis-angle orientation to coordinate axes representation.""" import numpy as np if not self.private_attribute_input_cache.axes: axis = np.asarray(self.axis_of_rotation, dtype=np.float64) angle = self.angle_of_rotation.to("rad").v.item() # type: ignore[attr-defined] axis = axis / np.linalg.norm(axis) rotation_matrix = rotation_matrix_from_axis_and_angle(axis, angle) self.private_attribute_input_cache.axes = np.transpose(rotation_matrix[:, :2]).tolist() return self @property def axes(self) -> OrthogonalAxes | None: """Return the axes that the box is aligned with.""" return self.private_attribute_input_cache.axes @pd.field_validator("center", "size", mode="after") @classmethod def _update_input_cache(cls, value: Any, info: pd.ValidationInfo) -> Any: setattr(info.data["private_attribute_input_cache"], info.field_name, value) # type: ignore[arg-type] return value def _apply_transformation(self, matrix: NDArray) -> Self: """Apply 3x4 transformation matrix with uniform scale validation and rotation composition.""" import numpy as np new_center, uniform_scale = _validate_uniform_scale_and_transform_center(matrix, self.center, "Box") # Combine rotations: existing rotation + transformation rotation existing_axis = np.asarray(self.axis_of_rotation, dtype=np.float64) existing_axis = existing_axis / np.linalg.norm(existing_axis) existing_angle = self.angle_of_rotation.to("rad").v.item() # type: ignore[attr-defined] rot_existing = rotation_matrix_from_axis_and_angle(existing_axis, existing_angle) rot_transform = _extract_rotation_matrix(matrix) rot_combined = rot_transform @ rot_existing new_axis, new_angle = _rotation_matrix_to_axis_angle(rot_combined) new_size = self.size * uniform_scale return self.model_copy( update={ "center": new_center, "axis_of_rotation": tuple(new_axis), "angle_of_rotation": new_angle * unyt.rad, "size": new_size, } )
@final class VoxelGrid(_VolumeEntityBase): """ Axis-aligned voxelized region used as a render target for direct volume rendering. Resolution is intrinsic to the voxelized 3D-texture entity (it shapes the underlying data, not just the rendered image), so it lives on the entity rather than in a render-side settings class. The grid is strictly axis-aligned — rotation and scene transforms are not supported on the schema side; the renderer applies any scene-level transform at draw time. Example ------- >>> VoxelGrid( ... name="grid", ... center=(0, 0, 0) * unyt.m, ... size=(1, 1, 1) * unyt.m, ... resolution=(256, 256, 256), ... ) """ center: Length.Vector3 = pd.Field(description="The coordinates of the center of the region.") # type: ignore[valid-type] size: Length.PositiveVector3 = pd.Field(description="The dimensions of the region (length, width, height).") # type: ignore[valid-type] private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) private_attribute_entity_type_name: Literal["VoxelGrid"] = pd.Field("VoxelGrid", frozen=True) resolution: tuple[pd.PositiveInt, pd.PositiveInt, pd.PositiveInt] = pd.Field( description=( "Voxel grid dimensions (X, Y, Z). Memory cost scales as X*Y*Z*4 bytes; " "512^3 is a reasonable preview cap, 1024^3 a final-render cap." ), )
[docs] @final class Sphere(_VolumeEntityBase): """ Sphere in three-dimensional space. Example ------- >>> Sphere( ... name="sphere_zone", ... center=(0, 0, 0) * unyt.m, ... radius=1.5 * unyt.m, ... axis=(0, 0, 1), ... ) """ private_attribute_entity_type_name: Literal["Sphere"] = pd.Field("Sphere", frozen=True) center: Length.Vector3 = pd.Field(description="The center point of the sphere.") # type: ignore[valid-type] radius: Length.PositiveFloat64 = pd.Field(description="The radius of the sphere.") # type: ignore[valid-type] axis: Axis = pd.Field( # type: ignore[assignment] default=(0, 0, 1), description="The axis of rotation for the sphere (used in sliding interfaces).", ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) def _apply_transformation(self, matrix: NDArray) -> Self: """Apply 3x4 transformation matrix with uniform scale validation.""" import numpy as np new_center, uniform_scale = _validate_uniform_scale_and_transform_center(matrix, self.center, "Sphere") axis_array = np.asarray(self.axis) transformed_axis = _transform_direction(axis_array, matrix) new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis)) new_radius = self.radius * uniform_scale return self.model_copy( update={ "center": new_center, "axis": new_axis, "radius": new_radius, } )
[docs] @final class Cylinder(_VolumeEntityBase): """ Cylinder in three-dimensional space. Example ------- >>> Cylinder( ... name="bet_disk_volume", ... center=(0, 0, 0) * unyt.inch, ... axis=(0, 0, 1), ... outer_radius=150 * unyt.inch, ... height=15 * unyt.inch, ... ) """ private_attribute_entity_type_name: Literal["Cylinder"] = pd.Field("Cylinder", frozen=True) axis: Axis = pd.Field(description="The axis of the cylinder.") center: Length.Vector3 = pd.Field(description="The center point of the cylinder.") # type: ignore[valid-type] height: Length.PositiveFloat64 = pd.Field(description="The height of the cylinder.") # type: ignore[valid-type] inner_radius: Length.NonNegativeFloat64 | None = pd.Field( # type: ignore[valid-type] 0 * unyt.m, description="The inner radius of the cylinder." ) outer_radius: Length.PositiveFloat64 = pd.Field(description="The outer radius of the cylinder.") # type: ignore[valid-type] private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) @pd.model_validator(mode="after") def _check_inner_radius_is_less_than_outer_radius(self) -> Self: if self.inner_radius is not None and self.inner_radius >= self.outer_radius: raise ValueError( f"Cylinder inner radius ({self.inner_radius}) must be less than outer radius ({self.outer_radius})." ) return self def _apply_transformation(self, matrix: NDArray) -> Self: """Apply 3x4 transformation matrix with uniform scale validation.""" import numpy as np new_center, uniform_scale = _validate_uniform_scale_and_transform_center(matrix, self.center, "Cylinder") axis_array = np.asarray(self.axis) transformed_axis = _transform_direction(axis_array, matrix) new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis)) new_height = self.height * uniform_scale new_outer_radius = self.outer_radius * uniform_scale new_inner_radius = self.inner_radius * uniform_scale if self.inner_radius is not None else None return self.model_copy( update={ "center": new_center, "axis": new_axis, "height": new_height, "outer_radius": new_outer_radius, "inner_radius": new_inner_radius, } )
@final class AxisymmetricSegment(Flow360BaseModel): """Reference to the region bounded by a segment of an AxisymmetricBody profile curve.""" model_config = pd.ConfigDict(frozen=True) type_name: Literal["AxisymmetricSegment"] = pd.Field("AxisymmetricSegment", frozen=True) owning_entity_id: str = pd.Field(description="The private_attribute_id of the owning AxisymmetricBody.") index: int = pd.Field(ge=0, description="Index along the profile curve (0-based).")
[docs] @final class AxisymmetricBody(_VolumeEntityBase): """ Generic body of revolution, represented as (axial, radial) profile polyline with arbitrary center and axial direction. First and last profile samples must connect to axis (radius = 0). Example ------- >>> AxisymmetricBody( ... name="cone_frustum_body", ... center=(0, 0, 0) * unyt.inch, ... axis=(0, 0, 1), ... profile_curve=[ ... (-1, 0) * unyt.inch, ... (-1, 1) * unyt.inch, ... (1, 2) * unyt.inch, ... (1, 0) * unyt.inch, ... ], ... ) """ private_attribute_entity_type_name: Literal["AxisymmetricBody"] = pd.Field("AxisymmetricBody", frozen=True) axis: Axis = pd.Field(description="The axis of the body of revolution.") center: Length.Vector3 = pd.Field(description="The center point of the body of revolution.") # type: ignore[valid-type] profile_curve: list[Length.Vector2] = pd.Field( # type: ignore[valid-type] description="The (Axial, Radial) profile of the body of revolution.", min_length=2, frozen=True, # ensure AxisymmetricSegment references are immutable ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) @pd.field_validator("profile_curve", mode="after") @classmethod def _check_radial_profile_is_positive(cls, curve: list[Length.Vector2]) -> list[Length.Vector2]: # type: ignore[valid-type] first_point = curve[0] if first_point[1] != 0: # type: ignore[index] raise ValueError( f"Expect first profile sample to be (Axial, 0.0). Found invalid point: {str(first_point)}." ) last_point = curve[-1] if last_point[1] != 0: # type: ignore[index] raise ValueError(f"Expect last profile sample to be (Axial, 0.0). Found invalid point: {str(last_point)}.") for profile_point in curve[1:-1]: if profile_point[1] < 0: # type: ignore[index] raise ValueError( f"Expect profile samples to be (Axial, Radial) samples with positive Radial." f" Found invalid point: {str(profile_point)}." ) return curve @pd.field_validator("profile_curve", mode="after") @classmethod def _check_profile_curve_has_no_duplicates(cls, curve: list[Length.Vector2]) -> list[Length.Vector2]: # type: ignore[valid-type] for i in range(len(curve) - 1): p1, p2 = curve[i], curve[i + 1] if p1[0] == p2[0] and p1[1] == p2[1]: # type: ignore[index] raise ValueError( f"Profile curve has duplicate consecutive points at indices {i} and {i + 1}: {str(p1)}." ) return curve
[docs] def segment(self, index: int) -> AxisymmetricSegment: """Return an AxisymmetricSegment reference for the given profile curve segment index.""" num_segments = len(self.profile_curve) - 1 if index < 0 or index >= num_segments: raise IndexError(f"Segment index {index} out of range [0, {num_segments - 1}]") return AxisymmetricSegment(owning_entity_id=self.private_attribute_id, index=index) # type: ignore[call-arg]
def _apply_transformation(self, matrix: NDArray) -> Self: """Apply 3x4 transformation matrix with uniform scale validation.""" import numpy as np new_center, uniform_scale = _validate_uniform_scale_and_transform_center( matrix, self.center, "AxisymmetricBody" ) axis_array = np.asarray(self.axis) transformed_axis = _transform_direction(axis_array, matrix) new_axis = tuple(transformed_axis / np.linalg.norm(transformed_axis)) new_profile_curve = [] for point in self.profile_curve: point_array = np.asarray(point.value) # type: ignore[attr-defined] scaled_point_array = point_array * uniform_scale new_profile_curve.append(type(point)(scaled_point_array, point.units)) # type: ignore[attr-defined, misc] return self.model_copy( update={ "center": new_center, "axis": new_axis, "profile_curve": new_profile_curve, } )
@final class GenericVolume(_VolumeEntityBase): """ Auto-constructed volume entity from uploaded volume mesh metadata. Not exposed to end users. Contains only basic connectivity/mesh information. """ private_attribute_entity_type_name: Literal["GenericVolume"] = pd.Field("GenericVolume", frozen=True) axes: OrthogonalAxes | None = pd.Field(None, description="") # Porous media support axis: Axis | None = pd.Field(None) # Rotation support center: Length.Vector3 | None = pd.Field(None, description="") # type: ignore[valid-type] # Rotation support
[docs] @final class CustomVolume(_VolumeEntityBase): """Volume zone defined by its bounding entities, generated by the volume mesher.""" private_attribute_entity_type_name: Literal["CustomVolume"] = pd.Field("CustomVolume", frozen=True) bounding_entities: EntityList[Surface, Cylinder, AxisymmetricBody, Sphere] = pd.Field( # type: ignore[type-arg] description="The entities that define the boundaries of the custom volume." ) private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) axes: OrthogonalAxes | None = pd.Field(None, description="") # Porous media support axis: Axis | None = pd.Field(None) # Rotation support center: Length.Vector3 | None = pd.Field(None, description="") # type: ignore[valid-type] # Rotation support @pd.model_validator(mode="before") @classmethod def _rename_boundaries_to_bounding_entities(cls, value: Any) -> Any: """Accept the legacy ``boundaries`` key and migrate to ``bounding_entities``.""" if not isinstance(value, dict): return value if "boundaries" in value and "bounding_entities" not in value: value["bounding_entities"] = value.pop("boundaries") add_validation_warning( "`CustomVolume.boundaries` has been renamed to `bounding_entities`. " "Please update your code to use `bounding_entities`." ) return value @contextual_field_validator("bounding_entities", mode="after") @classmethod def ensure_unique_boundary_names( cls, v: Any, param_info: Any, ) -> Any: """Check if the bounding entities have different names within a CustomVolume.""" expanded_surfaces = param_info.expand_entity_list(v) if len(expanded_surfaces) != len({entity.name for entity in expanded_surfaces}): raise ValueError("The bounding entities of a CustomVolume must have different names.") return v @contextual_field_validator("bounding_entities", mode="after") @classmethod def _validate_bounding_entity_existence(cls, value: Any, param_info: Any) -> Any: """Ensure all boundaries will be present after mesher.""" from flow360_schema.models.simulation.validation.validation_utils import ( validate_entity_list_surface_existence, ) return validate_entity_list_surface_existence(value, param_info) # type: ignore[no-untyped-call] @contextual_model_validator(mode="after") def ensure_beta_mesher_and_compatible_farfield(self, param_info: Any) -> Self: """Check if the beta mesher is enabled and that the user is using a compatible farfield.""" if param_info.is_beta_mesher and param_info.farfield_method in ( "user-defined", "wind-tunnel", "auto", ): return self raise ValueError( "CustomVolume is supported only when the beta mesher is enabled " "and an automated, user-defined, or wind tunnel farfield is enabled." ) def _apply_transformation(self, matrix: NDArray) -> Self: """Apply rotation from transformation matrix to axes only (no translation or scaling).""" import numpy as np if self.axes is None: return self rotation_matrix = _extract_rotation_matrix(matrix) x_axis_array = np.asarray(self.axes[0]) y_axis_array = np.asarray(self.axes[1]) new_x_axis = rotation_matrix @ x_axis_array new_y_axis = rotation_matrix @ y_axis_array new_axes = (tuple(new_x_axis), tuple(new_y_axis)) return self.model_copy(update={"axes": new_axes}) def _per_entity_type_validation(self, param_info: Any) -> Self: """Validate that CustomVolume is listed in meshing->volume_zones.""" if self.name not in param_info.to_be_generated_custom_volumes: raise ValueError( f"CustomVolume {self.name} is not listed under meshing->volume_zones(or zones)->CustomZones." ) return self
[docs] @final class SeedpointVolume(_VolumeEntityBase): """ Separate zone in the mesh, defined by one or more interior seed points. To be used only with snappyHexMesh. """ private_attribute_entity_type_name: Literal["SeedpointVolume"] = pd.Field("SeedpointVolume", frozen=True) type: Literal["SeedpointVolume"] = pd.Field("SeedpointVolume", frozen=True) point_in_mesh: list[Length.Vector3] = pd.Field( # type: ignore[valid-type] min_length=1, description=( "Seed point(s) for this custom volume zone. Accepts either one [x, y, z] point or a " "list of points [[x, y, z], ...]. Use with Snappy requires exactly one point per zone." ), ) axes: OrthogonalAxes | None = pd.Field( None, description="Principal axes definition when using with PorousMedium" ) # Porous media support axis: Axis | None = pd.Field(None) # Rotation support center: Length.Vector3 | None = pd.Field(None, description="") # type: ignore[valid-type] # Rotation support private_attribute_id: str = pd.Field(default_factory=generate_uuid, frozen=True) @pd.field_validator("point_in_mesh", mode="before") @classmethod @deprecation_reminder("25.11.0") # type: ignore[untyped-decorator] def _normalize_point_in_mesh(cls, value: Any) -> Any: """Accept a legacy single `[x, y, z]` point and wrap it as `[[x, y, z]]`. Slated for removal in 25.11 once the updater (added in 25.10.13) has migrated all stored configurations to the list-of-points form. """ try: single_point = pd.TypeAdapter(Length.Vector3).validate_python(value) except Exception: # pylint: disable=broad-exception-caught return value logger.warning( "SeedpointVolume.point_in_mesh as a single `[x, y, z]` is deprecated and " "will be removed in 25.11. Use `[[x, y, z], ...]` instead." ) return [single_point] def _per_entity_type_validation(self, param_info: Any) -> Self: """Validate that SeedpointVolume is listed in meshing->volume_zones.""" if self.name not in param_info.to_be_generated_custom_volumes: raise ValueError( f"SeedpointVolume {self.name} is not listed under meshing->volume_zones(or zones)->CustomZones." ) return self def _apply_transformation(self, matrix: NDArray) -> Self: """Apply 3x4 transformation matrix to each seed point.""" import numpy as np new_points = [] for point in self.point_in_mesh: point_array = np.asarray(point.value) # type: ignore[attr-defined] new_point_array = _transform_point(point_array, matrix) new_points.append(type(point)(new_point_array, point.units)) # type: ignore[attr-defined, misc] return self.model_copy(update={"point_in_mesh": new_points})