"""
Geometry component
"""
from __future__ import annotations
import os
import threading
from enum import Enum
from typing import Any, List, Literal, Optional, Union
import pydantic as pd
from flow360.cloud.flow360_requests import (
GeometryFileMeta,
LengthUnitType,
NewGeometryDependencyRequest,
NewGeometryRequest,
)
from flow360.cloud.heartbeat import post_upload_heartbeat
from flow360.cloud.rest_api import RestApi
from flow360.component.interfaces import GeometryInterface
from flow360.component.resource_base import (
AssetMetaBaseModelV2,
Flow360Resource,
ResourceDraft,
SubmissionMode,
)
from flow360.component.simulation.folder import Folder
from flow360.component.simulation.primitives import Edge, GeometryBodyGroup, Surface
from flow360.component.simulation.unit_system import LengthType
from flow360.component.simulation.utils import model_attribute_unlock
from flow360.component.simulation.web.asset_base import AssetBase
from flow360.component.utils import (
GeometryFiles,
MeshNameParser,
shared_account_confirm_proceed,
)
from flow360.exceptions import Flow360FileError, Flow360ValueError
from flow360.log import log
class GeometryStatus(Enum):
"""Status of geometry resource, the is_final method is overloaded"""
ERROR = "error"
UPLOADED = "uploaded"
UPLOADING = "uploading"
RUNNING = "running"
GENERATING = "generating"
PROCESSED = "processed"
DELETED = "deleted"
PENDING = "pending"
UNKNOWN = "unknown"
def is_final(self):
"""
Checks if status is final for geometry resource
Returns
-------
bool
True if status is final, False otherwise.
"""
if self in [
GeometryStatus.ERROR,
GeometryStatus.PROCESSED,
GeometryStatus.DELETED,
]:
return True
return False
# pylint: disable=R0801
class GeometryMeta(AssetMetaBaseModelV2):
"""
GeometryMeta component
"""
status: GeometryStatus = pd.Field() # Overshadowing to ensure correct is_final() method
dependency: bool = pd.Field(False)
class GeometryDraft(ResourceDraft):
"""
Unified Geometry Draft component for uploading geometry files.
This class handles both:
- Creating a new project with geometry as the root asset
- Adding geometry as a dependency to an existing project
The submission mode is determined by how the draft is created (via factory methods
on the Geometry class) and affects the behavior of the submit() method.
All geometries are conceptually equivalent - they are components that can be used
to create the final geometry for simulation. The distinction between "root" and
"dependency" is only about where the geometry is uploaded (new project vs existing
project), not about any fundamental difference in the geometry itself.
"""
# pylint: disable=too-many-arguments, too-many-instance-attributes
def __init__(
self,
file_names: Union[List[str], str],
project_name: str = None,
solver_version: str = None,
length_unit: LengthUnitType = "m",
tags: List[str] = None,
folder: Optional[Folder] = None,
):
"""
Initialize a GeometryDraft with common attributes.
For creating a new project (root geometry):
Use Geometry.from_file() which sets project_name, solver_version, folder
For adding to existing project (dependency geometry):
Use Geometry.import_to_project() which sets the dependency context
Parameters
----------
file_names : Union[List[str], str]
Path(s) to the geometry file(s)
length_unit : LengthUnitType, optional
Unit of length (default is "m")
tags : List[str], optional
Tags to assign to the geometry (default is None)
project_name : str, optional
Name of the project (for project root mode)
solver_version : str, optional
Solver version (for project root mode)
folder : Optional[Folder], optional
Parent folder (for project root mode)
"""
self._file_names = file_names
self.project_name = project_name
self.tags = tags if tags is not None else []
self.length_unit = length_unit
self.solver_version = solver_version
self.folder = folder
# pylint: disable=fixme
# TODO: create a DependableResourceDraft for GeometryDraft and SurfaceMeshDraft
self.dependency_name = None
self.dependency_project_id = None
self._submission_mode: SubmissionMode = SubmissionMode.PROJECT_ROOT
self._validate_geometry()
ResourceDraft.__init__(self)
def _validate_geometry(self):
"""Validate geometry files and length unit."""
if not isinstance(self.file_names, list) or len(self.file_names) == 0:
raise Flow360FileError("file_names field has to be a non-empty list.")
try:
GeometryFiles(file_names=self.file_names)
except pd.ValidationError as e:
raise Flow360FileError(str(e)) from e
for geometry_file in self.file_names:
if not os.path.exists(geometry_file):
raise Flow360FileError(f"{geometry_file} not found.")
if self.length_unit not in LengthUnitType.__args__:
raise Flow360ValueError(
f"specified length_unit : {self.length_unit} is invalid. "
f"Valid options are: {list(LengthUnitType.__args__)}"
)
def _set_default_project_name(self):
"""Set default project name if not provided for project creation."""
if self.project_name is None:
self.project_name = os.path.splitext(os.path.basename(self.file_names[0]))[0]
log.warning(
"`project_name` is not provided. "
f"Using the first geometry file name {self.project_name} as project name."
)
def _validate_submission_context(self):
"""Validate context for submission based on mode."""
if self._submission_mode is None:
raise ValueError("[Internal] Geometry submission context not set.")
if self._submission_mode == SubmissionMode.PROJECT_ROOT and self.solver_version is None:
raise Flow360ValueError("solver_version field is required.")
if self._submission_mode == SubmissionMode.PROJECT_DEPENDENCY:
if self.dependency_name is None or self.dependency_project_id is None:
raise ValueError(
"[Internal] Dependency name and project ID must be set for geometry dependency submission."
)
@property
def file_names(self) -> List[str]:
"""Geometry file paths as a list."""
if isinstance(self._file_names, str):
return [self._file_names]
return self._file_names
def set_dependency_context(
self,
name: str,
project_id: str,
) -> None:
"""
Configure this draft to add geometry to an existing project.
Called internally by Geometry.import_to_project().
"""
self._submission_mode = SubmissionMode.PROJECT_DEPENDENCY
self.dependency_name = name
self.dependency_project_id = project_id
def _preprocess_mapbc_files(self) -> List[str]:
"""Find and return associated mapbc files for UGRID geometry files."""
mapbc_files = []
for file_path in self.file_names:
mesh_parser = MeshNameParser(file_path)
if mesh_parser.is_ugrid() and os.path.isfile(
mesh_parser.get_associated_mapbc_filename()
):
file_name_mapbc = mesh_parser.get_associated_mapbc_filename()
mapbc_files.append(file_name_mapbc)
return mapbc_files
def _create_project_root_resource(
self, mapbc_files: List[str], description: str = ""
) -> GeometryMeta:
"""Create a new geometry resource that will be the root of a new project."""
self._set_default_project_name()
req = NewGeometryRequest(
name=self.project_name,
solver_version=self.solver_version,
tags=self.tags,
files=[
GeometryFileMeta(
name=os.path.basename(file_path),
type="main",
)
for file_path in self.file_names + mapbc_files
],
parent_folder_id=self.folder.id if self.folder else "ROOT.FLOW360",
length_unit=self.length_unit,
description=description,
)
resp = RestApi(GeometryInterface.endpoint).post(req.dict())
return GeometryMeta(**resp)
def _create_dependency_resource(
self, mapbc_files: List[str], description: str = "", draft_id: str = "", icon: str = ""
) -> GeometryMeta:
"""Create a geometry resource as a dependency in an existing project."""
req = NewGeometryDependencyRequest(
name=self.dependency_name,
project_id=self.dependency_project_id,
draft_id=draft_id,
files=[
GeometryFileMeta(
name=os.path.basename(file_path),
type="main",
)
for file_path in self.file_names + mapbc_files
],
length_unit=self.length_unit,
tags=self.tags,
description=description,
icon=icon,
)
resp = RestApi(GeometryInterface.endpoint).post(req.dict(), method="dependency")
return GeometryMeta(**resp)
def _upload_files(
self,
info: GeometryMeta,
mapbc_files: List[str],
progress_callback=None,
) -> Geometry:
"""Upload geometry files to the cloud."""
# pylint: disable=protected-access
geometry = Geometry(info.id)
heartbeat_info = {"resourceId": info.id, "resourceType": "Geometry", "stop": False}
# Keep posting the heartbeat to keep server patient about uploading.
heartbeat_thread = threading.Thread(target=post_upload_heartbeat, args=(heartbeat_info,))
heartbeat_thread.start()
try:
for file_path in self.file_names + mapbc_files:
geometry._webapi._upload_file(
remote_file_name=os.path.basename(file_path),
file_name=file_path,
progress_callback=progress_callback,
)
finally:
heartbeat_info["stop"] = True
heartbeat_thread.join()
# Kick off pipeline
geometry._webapi._complete_upload()
# Setting _id will disable "WARNING: You have not submitted..." warning message
self._id = info.id
return geometry
# pylint: disable=duplicate-code
def submit(
self,
description: str = "",
progress_callback=None,
run_async: bool = False,
draft_id: str = "",
icon: str = "",
) -> Geometry:
"""
Submit geometry to cloud.
The behavior depends on how this draft was created:
- If created via Geometry.from_file(): Creates a new project with this geometry as root
- If created via Geometry.import_to_project(): Adds geometry to an existing project
Parameters
----------
description : str, optional
Description of the geometry/project (default is "")
progress_callback : callback, optional
Use for custom progress bar (default is None)
run_async : bool, optional
Whether to return immediately after upload without waiting for processing
(default is False)
draft_id : str, optional
ID of the draft to add geometry to (only used for dependency mode, default is "")
icon : str, optional
Icon for the geometry (only used for dependency mode, default is "")
Returns
-------
Geometry
Geometry object with id
Raises
------
Flow360ValueError
If submission context is not set or user aborts
"""
self._validate_geometry()
self._validate_submission_context()
if not shared_account_confirm_proceed():
raise Flow360ValueError("User aborted resource submit.")
mapbc_files = self._preprocess_mapbc_files()
# Create the geometry resource based on submission mode
if self._submission_mode == SubmissionMode.PROJECT_ROOT:
info = self._create_project_root_resource(mapbc_files, description)
log_message = "Geometry successfully submitted"
else:
info = self._create_dependency_resource(mapbc_files, description, draft_id, icon)
log_message = "New geometry successfully submitted to the project"
# Upload files
geometry = self._upload_files(info, mapbc_files, progress_callback)
log.info(f"{log_message}: {geometry.short_description()}")
if run_async:
return geometry
log.info("Waiting for geometry to be processed.")
return Geometry.from_cloud(info.id)
[docs]
class Geometry(AssetBase):
"""
Geometry component for workbench (simulation V2)
"""
_interface_class = GeometryInterface
_meta_class = GeometryMeta
_draft_class = GeometryDraft
_web_api_class = Flow360Resource
_cloud_resource_type_name = "Geometry"
# pylint: disable=redefined-builtin
def __init__(self, id: Union[str, None]):
super().__init__(id)
self.snappy_body_registry = None
@property
def face_group_tag(self):
"getter for face_group_tag"
return self._entity_info.face_group_tag
@face_group_tag.setter
def face_group_tag(self, new_value: str):
raise SyntaxError("Cannot set face_group_tag, use group_faces_by_tag() instead.")
@property
def edge_group_tag(self):
"getter for edge_group_tag"
return self._entity_info.edge_group_tag
@edge_group_tag.setter
def edge_group_tag(self, new_value: str):
raise SyntaxError("Cannot set edge_group_tag, use group_edges_by_tag() instead.")
@property
def body_group_tag(self):
"getter for body_group_tag"
return self._entity_info.body_group_tag
@body_group_tag.setter
def body_group_tag(self, new_value: str):
raise SyntaxError("Cannot set body_group_tag, use group_bodies_by_tag() instead.")
@property
def snappy_bodies(self):
"""Getter for the snappy registry."""
if self.snappy_body_registry is None:
raise Flow360ValueError(
"The faces in geometry are not grouped for snappy."
"Please use `group_faces_for_snappy` function to group them first."
)
return self.snappy_body_registry
[docs]
def get_dynamic_default_settings(self, simulation_dict: dict):
"""Get the default geometry settings from the simulation dict"""
def _get_default_geometry_accuracy(simulation_dict: dict) -> LengthType.Positive:
"""Get the default geometry accuracy from the simulation json"""
if simulation_dict.get("meshing") is None:
return None
if simulation_dict["meshing"].get("defaults") is None:
return None
if simulation_dict["meshing"]["defaults"].get("geometry_accuracy") is None:
return None
# pylint: disable=no-member
return LengthType.validate(simulation_dict["meshing"]["defaults"]["geometry_accuracy"])
self.default_settings["geometry_accuracy"] = (
self._entity_info.default_geometry_accuracy
if self._entity_info.default_geometry_accuracy
else _get_default_geometry_accuracy(simulation_dict=simulation_dict)
)
[docs]
@classmethod
# pylint: disable=redefined-builtin
def from_cloud(cls, id: str, **kwargs) -> Geometry:
"""Create asset with the given ID"""
asset_obj = super().from_cloud(id, **kwargs)
return asset_obj
[docs]
@classmethod
# pylint: disable=too-many-arguments
def from_file(
cls,
file_names: Union[List[str], str],
project_name: str = None,
solver_version: str = None,
length_unit: LengthUnitType = "m",
tags: List[str] = None,
folder: Optional[Folder] = None,
) -> GeometryDraft:
# For type hint only but proper fix is to fully abstract the Draft class too.
return super().from_file(
file_names, project_name, solver_version, length_unit, tags, folder=folder
)
[docs]
@classmethod
# pylint: disable=too-many-arguments
def import_to_project(
cls,
name: str,
file_names: Union[List[str], str],
project_id: str,
length_unit: LengthUnitType = "m",
tags: List[str] = None,
) -> GeometryDraft:
"""
Create a geometry draft for adding to an existing project.
This creates a geometry that will be added as a supplementary component
(dependency) to an existing project, rather than creating a new project.
Parameters
----------
name : str
Name for the geometry component
file_names : Union[List[str], str]
Path(s) to the geometry file(s)
project_id : str
ID of the existing project to add this geometry to
length_unit : LengthUnitType, optional
Unit of length (default is "m")
tags : List[str], optional
Tags to assign to the geometry (default is None)
Returns
-------
GeometryDraft
A draft configured for submission to an existing project
"""
draft = GeometryDraft(
file_names=file_names,
length_unit=length_unit,
tags=tags,
)
draft.set_dependency_context(name=name, project_id=project_id)
return draft
[docs]
def show_available_groupings(self, verbose_mode: bool = False):
"""Display all the possible groupings for faces and edges"""
self._show_available_entity_groups(
"faces",
ignored_attribute_tags=["__all__", "faceId"],
show_ids_in_each_group=verbose_mode,
)
self._show_available_entity_groups(
"edges",
ignored_attribute_tags=["__all__", "edgeId"],
show_ids_in_each_group=verbose_mode,
)
self._show_available_entity_groups(
"bodies",
ignored_attribute_tags=["__all__", "bodyId"],
show_ids_in_each_group=verbose_mode,
)
[docs]
@classmethod
def from_local_storage(
cls, geometry_id: str = None, local_storage_path="", meta_data: GeometryMeta = None
) -> Geometry:
"""
Parameters
----------
geometry_id : str
ID of the geometry resource
local_storage_path:
The folder of the project, defaults to current working directory
Returns
-------
Geometry
Geometry object
"""
return super()._from_local_storage(
asset_id=geometry_id, local_storage_path=local_storage_path, meta_data=meta_data
)
def _show_available_entity_groups(
self,
entity_type_name: Literal["faces", "edges", "bodies"],
ignored_attribute_tags: list = None,
show_ids_in_each_group: bool = False,
) -> None:
"""
Display all the grouping info for the given entity type
"""
if entity_type_name not in ["faces", "edges", "bodies"]:
raise Flow360ValueError(
f"entity_type_name: {entity_type_name} is invalid. Valid options are: ['faces', 'edges', 'bodies']"
)
# pylint: disable=no-member
if entity_type_name == "faces":
attribute_names = self._entity_info.face_attribute_names
grouped_items = self._entity_info.grouped_faces
elif entity_type_name == "edges":
attribute_names = self._entity_info.edge_attribute_names
grouped_items = self._entity_info.grouped_edges
else:
attribute_names = self._entity_info.body_attribute_names
grouped_items = self._entity_info.grouped_bodies
log.info(f" >> Available attribute tags for grouping **{entity_type_name}**:")
for tag_index, attribute_tag in enumerate(attribute_names):
if ignored_attribute_tags is not None and attribute_tag in ignored_attribute_tags:
continue
log.info(
f" >> Tag '{tag_index}': {attribute_tag}. Grouping with this tag results in:"
)
for index, entity in enumerate(grouped_items[tag_index]):
log.info(f" >> [{index}]: {entity.name}")
if show_ids_in_each_group is True:
log.info(f" IDs: {entity.private_attribute_sub_components}")
[docs]
def group_faces_by_tag(self, tag_name: str) -> None:
"""
Group faces by tag name
"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._group_entity_by_tag(
"face", tag_name, self.internal_registry
)
[docs]
def group_edges_by_tag(self, tag_name: str) -> None:
"""
Group edges by tag name
"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._group_entity_by_tag(
"edge", tag_name, self.internal_registry
)
[docs]
def group_bodies_by_tag(self, tag_name: str) -> None:
"""
Group bodies by tag name
"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._group_entity_by_tag(
"body", tag_name, self.internal_registry
)
[docs]
def group_faces_for_snappy(self) -> None:
"""
Group faces according to body::region convention for snappyHexMesh.
"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._group_entity_by_tag(
"face", "faceId", self.internal_registry
)
# pylint: disable=protected-access
self.snappy_body_registry = self._entity_info._group_faces_by_snappy_format()
[docs]
def reset_face_grouping(self) -> None:
"""Reset the face grouping"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._reset_grouping("face", self.internal_registry)
if self.snappy_body_registry is not None:
self.snappy_body_registry = self.snappy_body._reset_grouping(
"face", self.snappy_body_registry
)
[docs]
def reset_edge_grouping(self) -> None:
"""Reset the edge grouping"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._reset_grouping("edge", self.internal_registry)
[docs]
def reset_body_grouping(self) -> None:
"""Reset the body grouping"""
# pylint: disable=protected-access,no-member
self.internal_registry = self._entity_info._reset_grouping("body", self.internal_registry)
def _rename_entity(
self,
entity_type_name: Literal["face", "edge", "body"],
current_name_pattern: str,
new_name_prefix: str,
):
"""
Rename the entity
Parameters
----------
entity_type_name : Literal["face", "edge", "body"]
The type of entity that needs renaming
current_name_pattern:
The current name of a single entity or the name pattern of the entities
new_name_prefix:
The new name of a single entity or the new name prefix of the entities
"""
# pylint: disable=too-many-boolean-expressions
if (
(entity_type_name == "face" and not self.face_group_tag)
or (entity_type_name == "edge" and not self.edge_group_tag)
or (entity_type_name == "body" and not self.body_group_tag)
):
raise Flow360ValueError(
f"Renaming failed: Could not find {entity_type_name} grouping info in the draft's simulation settings."
"Please group them first before renaming the entities."
)
matched_entities = self.internal_registry.find_by_naming_pattern(
pattern=current_name_pattern
)
if entity_type_name == "body":
matched_entities = [
entity for entity in matched_entities if isinstance(entity, GeometryBodyGroup)
]
if entity_type_name == "face":
matched_entities = [
entity for entity in matched_entities if isinstance(entity, Surface)
]
if entity_type_name == "edge":
matched_entities = [entity for entity in matched_entities if isinstance(entity, Edge)]
matched_entities = sorted(
matched_entities,
key=lambda x: x.name,
)
if len(matched_entities) == 0:
raise Flow360ValueError(
f"Renaming failed: No entity is found to match the input name pattern: {current_name_pattern}."
)
for idx, entity in enumerate(matched_entities):
new_name = (
f"{new_name_prefix}_{(idx+1):04d}" if len(matched_entities) > 1 else new_name_prefix
)
if self.internal_registry.find_by_naming_pattern(new_name):
raise Flow360ValueError(
f"Renaming failed: An entity with the new name: {new_name} already exists."
)
with model_attribute_unlock(entity, "name"):
entity.name = new_name
[docs]
def rename_edges(self, current_name_pattern: str, new_name_prefix: str):
"""
Rename the edge in the current edge group
Parameters
----------
current_name_pattern:
The current name of a single edge or the name pattern of the edges
new_name_prefix:
The new name of a single edge or the new name prefix of the edges
"""
self._rename_entity(
entity_type_name="edge",
current_name_pattern=current_name_pattern,
new_name_prefix=new_name_prefix,
)
[docs]
def rename_surfaces(self, current_name_pattern: str, new_name_prefix: str):
"""
Rename the face in the current face group
Parameters
----------
current_name_pattern:
The current name of a single face or the name pattern of the faces
new_name_prefix:
The new name of a single face or the new name prefix of the faces
"""
self._rename_entity(
entity_type_name="face",
current_name_pattern=current_name_pattern,
new_name_prefix=new_name_prefix,
)
[docs]
def rename_body_groups(self, current_name_pattern: str, new_name_prefix: str):
"""
Rename the body in the current body group
Parameters
----------
current_name_pattern:
The current name of a single body or the name pattern of the bodies
new_name_prefix:
The new name of a single body or the new name prefix of the bodies
"""
self._rename_entity(
entity_type_name="body",
current_name_pattern=current_name_pattern,
new_name_prefix=new_name_prefix,
)
def __getitem__(self, key: str):
"""
Get the entity by name.
`key` is the name of the entity or the naming pattern if wildcard is used.
"""
# pylint: disable=import-outside-toplevel
from flow360.component.simulation.draft_context import get_active_draft
if get_active_draft() is not None:
log.warning(
"Accessing entities via asset[key] while a DraftContext is active. "
"Use draft.surfaces[key] or draft.body_groups[key] instead to ensure "
"modifications are tracked in the draft's entity_info."
)
if isinstance(key, str) is False:
raise Flow360ValueError(f"Entity naming pattern: {key} is not a string.")
if hasattr(self, "internal_registry") is False or self.internal_registry is None:
raise Flow360ValueError(
"The faces/edges/bodies in geometry are not grouped yet."
"Please use `group_faces_by_tag` or `group_edges_by_tag` function to group them first."
)
# Note: Or we assume group default by just FaceID and EdgeID? Not sure if this is actually useful.
return self.internal_registry.find_by_naming_pattern(
key, enforce_output_as_list=False, error_when_no_match=True
)
def __setitem__(self, key: str, value: Any):
raise NotImplementedError("Assigning/setting entities is not supported.")