"""
Volume mesh component
"""
from __future__ import annotations
import os.path
from enum import Enum
from typing import Iterator, List, Optional, Union
import numpy as np
from pydantic import Extra, Field, validator
from flow360.component.compress_upload import compress_and_upload_chunks
from ..cloud.requests import CopyExampleVolumeMeshRequest, NewVolumeMeshRequest
from ..cloud.rest_api import RestApi
from ..exceptions import (
Flow360CloudFileError,
Flow360FileError,
Flow360NotImplementedError,
Flow360RuntimeError,
Flow360ValueError,
)
from ..log import log
from ..solver_version import Flow360Version
from .case import Case, CaseDraft
from .flow360_params.boundaries import NoSlipWall
from .flow360_params.flow360_params import (
Flow360MeshParams,
Flow360Params,
_GenericBoundaryWrapper,
)
from .flow360_params.params_base import params_generic_validator
from .interfaces import VolumeMeshInterface
from .meshing.params import VolumeMeshingParams
from .resource_base import (
Flow360Resource,
Flow360ResourceBaseModel,
Flow360ResourceListBase,
ResourceDraft,
)
from .types import COMMENTS
from .utils import shared_account_confirm_proceed, validate_type, zstd_compress
from .validator import Validator
try:
import h5py
_H5PY_AVAILABLE = True
except ImportError:
_H5PY_AVAILABLE = False
def get_datatype(dataset):
"""
Get datatype of dataset
:param dataset:
:return:
"""
data_raw = np.empty(dataset.shape, dataset.dtype)
dataset.read_direct(data_raw)
data_str = "".join([chr(i) for i in dataset])
return data_str
def get_no_slip_walls(params: Union[Flow360Params, Flow360MeshParams]):
"""
Get wall boundary names
:param params:
:return:
"""
assert params
if (
isinstance(params, Flow360MeshParams)
and params.boundaries
and params.boundaries.no_slip_walls
):
return params.boundaries.no_slip_walls
if isinstance(params, Flow360Params) and params.boundaries:
return [
wall_name
for wall_name, wall in params.boundaries.dict().items()
if wall_name != COMMENTS and _GenericBoundaryWrapper(v=wall).v.type == NoSlipWall().type
]
return []
def get_boundaries_from_sliding_interfaces(params: Union[Flow360Params, Flow360MeshParams]):
"""
Get wall boundary names
:param params:
:return:
"""
assert params
res = []
# Sliding interfaces are deprecated - we need to handle this somehow
# if params.sliding_interfaces and params.sliding_interfaces.rotating_patches:
# res += params.sliding_interfaces.rotating_patches[:]
# if params.sliding_interfaces and params.sliding_interfaces.stationary_patches:
# res += params.sliding_interfaces.stationary_patches[:]
return res
# pylint: disable=too-many-branches
def get_boundaries_from_file(cgns_file: str, solver_version: str = None):
"""
Get boundary names from CGNS file
:param cgns_file:
:param solver_version:
:return:
"""
names = []
with h5py.File(cgns_file, "r") as h5_file:
base = h5_file["Base"]
for zone_name, zone in base.items():
if zone_name == " data":
continue
if zone.attrs["label"].decode() != "Zone_t":
continue
zone_type = get_datatype(base[f"{zone_name}/ZoneType/ data"])
if zone_type not in ["Structured", "Unstructured"]:
continue
for section_name, section in zone.items():
if section_name == " data":
continue
if "label" not in section.attrs:
continue
if solver_version and Flow360Version(solver_version) < Flow360Version(
"release-22.2.1.0"
):
if section.attrs["label"].decode() != "Elements_t":
continue
element_type_tag = int(zone[f"{section_name}/ data"][0])
if element_type_tag in [5, 7]:
names.append(f"{zone_name}/{section_name}")
if element_type_tag == 20:
first_element_type_tag = zone[f"{section_name}/ElementConnectivity/ data"][
0
]
if first_element_type_tag in [5, 7]:
names.append(f"{zone_name}/{section_name}")
else:
if section.attrs["label"].decode() != "ZoneBC_t":
continue
for bc_name, bc_zone in section.items():
if bc_zone.attrs["label"].decode() == "BC_t":
names.append(f"{zone_name}/{bc_name}")
return names
def validate_cgns(
cgns_file: str, params: Union[Flow360Params, Flow360MeshParams], solver_version=None
):
"""
Validate CGNS file
:param cgns_file:
:param params:
:param solver_version:
:return:
"""
assert cgns_file
assert params
boundaries_in_file = get_boundaries_from_file(cgns_file, solver_version)
boundaries_in_params = get_no_slip_walls(params) + get_boundaries_from_sliding_interfaces(
params
)
boundaries_in_file = set(boundaries_in_file)
boundaries_in_params = set(boundaries_in_params)
if not boundaries_in_file.issuperset(boundaries_in_params):
raise Flow360ValueError(
"The following input boundary names from mesh json are not found in mesh:"
+ f" {' '.join(boundaries_in_params - boundaries_in_file)}."
+ f" Boundary names in cgns: {' '.join(boundaries_in_file)}"
+ f" Boundary names in params: {' '.join(boundaries_in_file)}"
)
log.info(
f'Notice: {" ".join(boundaries_in_file - boundaries_in_params)} is '
+ "tagged as wall in cgns file, but not in input params"
)
class VolumeMeshLog(Enum):
"""
Volume mesh log
"""
USER_LOG = "user.log"
PY_LOG = "validateFlow360Mesh.py.log"
class VolumeMeshDownloadable(Enum):
"""
Volume mesh downloadable files
"""
CONFIG_JSON = "config.json"
class VolumeMeshFileFormat(Enum):
"""
Volume mesh file format
"""
UGRID = "aflr3"
CGNS = "cgns"
def ext(self) -> str:
"""
Get the extention for a file name.
:return:
"""
if self is VolumeMeshFileFormat.UGRID:
return ".ugrid"
if self is VolumeMeshFileFormat.CGNS:
return ".cgns"
return ""
@classmethod
def detect(cls, file: str):
"""
detects mesh format from filename
"""
ext = os.path.splitext(file)[1]
if ext == VolumeMeshFileFormat.UGRID.ext():
return VolumeMeshFileFormat.UGRID
if ext == VolumeMeshFileFormat.CGNS.ext():
return VolumeMeshFileFormat.CGNS
raise Flow360RuntimeError(f"Unsupported file format {file}")
class UGRIDEndianness(Enum):
"""
UGRID endianness
"""
LITTLE = "little"
BIG = "big"
NONE = None
def ext(self) -> str:
"""
Get the extention for a file name.
:return:
"""
if self is UGRIDEndianness.LITTLE:
return ".lb8"
if self is UGRIDEndianness.BIG:
return ".b8"
return ""
@classmethod
def detect(cls, file: str):
"""
detects endianess UGRID mesh from filename
"""
if VolumeMeshFileFormat.detect(file) is not VolumeMeshFileFormat.UGRID:
return UGRIDEndianness.NONE
basename = os.path.splitext(file)[0]
ext = os.path.splitext(basename)[1]
if ext == UGRIDEndianness.LITTLE.ext():
return UGRIDEndianness.LITTLE
if ext == UGRIDEndianness.BIG.ext():
return UGRIDEndianness.BIG
raise Flow360RuntimeError(f"Unknown endianness for file {file}")
class CompressionFormat(Enum):
"""
Volume mesh file format
"""
GZ = "gz"
BZ2 = "bz2"
ZST = "zst"
NONE = None
def ext(self) -> str:
"""
Get the extention for a file name.
:return:
"""
if self is CompressionFormat.GZ:
return ".gz"
if self is CompressionFormat.BZ2:
return ".bz2"
if self is CompressionFormat.ZST:
return ".zst"
return ""
@classmethod
def detect(cls, file: str):
"""
detects compression from filename
"""
file_name, ext = os.path.splitext(file)
if ext == CompressionFormat.GZ.ext():
return CompressionFormat.GZ, file_name
if ext == CompressionFormat.BZ2.ext():
return CompressionFormat.BZ2, file_name
if ext == CompressionFormat.ZST.ext():
return CompressionFormat.ZST, file_name
return CompressionFormat.NONE, file
# pylint: disable=E0213
class VolumeMeshMeta(Flow360ResourceBaseModel, extra=Extra.allow):
"""
VolumeMeshMeta component
"""
id: str = Field(alias="meshId")
name: str = Field(alias="meshName")
created_at: str = Field(alias="meshAddTime")
surface_mesh_id: Optional[str] = Field(alias="surfaceMeshId")
mesh_params: Union[Flow360MeshParams, None, dict] = Field(alias="meshParams")
mesh_format: Union[VolumeMeshFileFormat, None] = Field(alias="meshFormat")
file_name: Union[str, None] = Field(alias="fileName")
endianness: UGRIDEndianness = Field(alias="meshEndianness")
compression: CompressionFormat = Field(alias="meshCompression")
boundaries: Union[List, None]
@validator("mesh_params", pre=True)
def init_mesh_params(cls, value):
"""
validator for mesh_params
"""
return params_generic_validator(value, Flow360MeshParams)
@validator("endianness", pre=True)
def init_endianness(cls, value):
"""
validator for endianess
"""
return UGRIDEndianness(value) or UGRIDEndianness.NONE
@validator("compression", pre=True)
def init_compression(cls, value):
"""
validator for compression
"""
try:
return CompressionFormat(value)
except ValueError:
return CompressionFormat.NONE
def to_volume_mesh(self) -> VolumeMesh:
"""
returns VolumeMesh object from volume mesh meta info
"""
return VolumeMesh(self.id)
class VolumeMeshDraft(ResourceDraft):
"""
Volume mesh draft component (before submit)
"""
# pylint: disable=too-many-arguments, too-many-instance-attributes
def __init__(
self,
file_name: str = None,
params: Union[Flow360MeshParams, VolumeMeshingParams] = None,
name: str = None,
surface_mesh_id=None,
tags: List[str] = None,
solver_version=None,
endianess: UGRIDEndianness = None,
isascii: bool = False,
):
if file_name is not None and not os.path.exists(file_name):
raise Flow360FileError(f"File '{file_name}' not found.")
if endianess is not None:
raise Flow360NotImplementedError(
"endianess selections not supported, it is inferred from filename"
)
if isascii is True:
raise Flow360NotImplementedError("isascii not supported")
self.params = None
if params is not None:
if not isinstance(params, Flow360MeshParams) and not isinstance(
params, VolumeMeshingParams
):
raise ValueError(
f"params={params} are not of type Flow360MeshParams OR VolumeMeshingParams"
)
self.params = params.copy(deep=True)
if name is None and file_name is not None:
name = os.path.splitext(os.path.basename(file_name))[0]
self.file_name = file_name
self.name = name
self.surface_mesh_id = surface_mesh_id
self.tags = tags
self.solver_version = solver_version
self._id = None
self.compress_method = CompressionFormat.ZST
ResourceDraft.__init__(self)
def _submit_from_surface(self):
self.validator_api(self.params, solver_version=self.solver_version)
body = {
"name": self.name,
"tags": self.tags,
"surfaceMeshId": self.surface_mesh_id,
"config": self.params.flow360_json(),
"format": "cgns",
}
if self.solver_version:
body["solverVersion"] = self.solver_version
resp = RestApi(VolumeMeshInterface.endpoint).post(body)
if not resp:
return None
info = VolumeMeshMeta(**resp)
self._id = info.id
mesh = VolumeMesh(self.id)
log.info(f"VolumeMesh successfully submitted: {mesh.short_description()}")
return mesh
# pylint: disable=protected-access, too-many-locals
def _submit_upload_mesh(self, progress_callback=None):
assert os.path.exists(self.file_name)
original_compression, file_name_no_compression = CompressionFormat.detect(self.file_name)
mesh_format = VolumeMeshFileFormat.detect(file_name_no_compression)
endianness = UGRIDEndianness.detect(file_name_no_compression)
if mesh_format is VolumeMeshFileFormat.CGNS:
remote_file_name = "volumeMesh"
else:
remote_file_name = "mesh"
compression = (
original_compression
if original_compression != CompressionFormat.NONE
else self.compress_method
)
remote_file_name = (
f"{remote_file_name}{endianness.ext()}{mesh_format.ext()}{compression.ext()}"
)
name = self.name
if name is None:
name = os.path.splitext(os.path.basename(self.file_name))[0]
req = NewVolumeMeshRequest(
name=name,
file_name=remote_file_name,
tags=self.tags,
format=mesh_format.value,
endianness=endianness.value,
compression=compression.value,
params=self.params,
solver_version=self.solver_version,
)
resp = RestApi(VolumeMeshInterface.endpoint).post(req.dict())
if not resp:
return None
info = VolumeMeshMeta(**resp)
self._id = info.id
mesh = VolumeMesh(self.id)
# parallel compress and upload
if (
original_compression == CompressionFormat.NONE
and self.compress_method == CompressionFormat.BZ2
):
upload_id = mesh.create_multipart_upload(remote_file_name)
compress_and_upload_chunks(self.file_name, upload_id, mesh, remote_file_name)
elif (
original_compression == CompressionFormat.NONE
and self.compress_method == CompressionFormat.ZST
):
compressed_file_name = zstd_compress(self.file_name)
mesh._upload_file(
remote_file_name, compressed_file_name, progress_callback=progress_callback
)
os.remove(compressed_file_name)
else:
mesh._upload_file(remote_file_name, self.file_name, progress_callback=progress_callback)
mesh._complete_upload(remote_file_name)
log.info(f"VolumeMesh successfully uploaded: {mesh.short_description()}")
return mesh
def submit(self, progress_callback=None) -> VolumeMesh:
"""submit mesh to cloud
Parameters
----------
progress_callback : callback, optional
Use for custom progress bar, by default None
Returns
-------
VolumeMesh
VolumeMesh object with id
"""
if not shared_account_confirm_proceed():
raise Flow360ValueError("User aborted resource submit.")
if self.file_name is not None:
return self._submit_upload_mesh(progress_callback)
if self.surface_mesh_id is not None and self.name is not None and self.params is not None:
return self._submit_from_surface()
raise Flow360ValueError(
"You must provide volume mesh file for upload or surface mesh Id with meshing parameters."
)
@classmethod
def validator_api(cls, params: VolumeMeshingParams, solver_version=None):
"""
validation api: validates surface meshing parameters before submitting
"""
return Validator.VOLUME_MESH.validate(params, solver_version=solver_version)
[docs]
class VolumeMesh(Flow360Resource):
"""
Volume mesh component
"""
# pylint: disable=redefined-builtin
def __init__(self, id: str):
super().__init__(
interface=VolumeMeshInterface,
info_type_class=VolumeMeshMeta,
id=id,
)
self.__mesh_params = None
@classmethod
def _from_meta(cls, meta: VolumeMeshMeta):
validate_type(meta, "meta", VolumeMeshMeta)
volume_mesh = cls(id=meta.id)
volume_mesh._set_meta(meta)
return volume_mesh
@property
def info(self) -> VolumeMeshMeta:
return super().info
@property
def _mesh_params(self) -> Flow360MeshParams:
"""
returns mesh params
"""
if self.__mesh_params is None:
self.__mesh_params = self.info.mesh_params
return self.__mesh_params
@property
def no_slip_walls(self):
"""
returns mesh no_slip_walls
"""
if self._mesh_params is None:
return None
return self._mesh_params.boundaries.no_slip_walls
@property
def all_boundaries(self):
"""
returns mesh no_slip_walls
"""
return self.info.boundaries
# pylint: disable=too-many-arguments,R0801
[docs]
def download_file(
self,
file_name: Union[str, VolumeMeshDownloadable],
to_file=None,
to_folder=".",
overwrite: bool = True,
progress_callback=None,
**kwargs,
):
"""
Download file from surface mesh
:param file_name:
:param to_file:
:return:
"""
if isinstance(file_name, VolumeMeshDownloadable):
file_name = file_name.value
return super()._download_file(
file_name,
to_file=to_file,
to_folder=to_folder,
overwrite=overwrite,
progress_callback=progress_callback,
**kwargs,
)
# pylint: disable=R0801
[docs]
def download(self, to_file=None, to_folder=".", overwrite: bool = True):
"""
Download volume mesh file
:param to_file:
:return:
"""
status = self.status
if not status.is_final():
log.warning(f"Cannot download file because status={status}")
return None
remote_file_name = self.info.file_name
if remote_file_name is None:
remote_file_name = self._remote_file_name()
return super()._download_file(
remote_file_name,
to_file=to_file,
to_folder=to_folder,
overwrite=overwrite,
)
def _complete_upload(self, remote_file_name):
"""
Complete volume mesh upload
:return:
"""
resp = self.post({}, method=f"completeUpload?fileName={remote_file_name}")
self._info = VolumeMeshMeta(**resp)
@classmethod
def _interface(cls):
return VolumeMeshInterface
@classmethod
def _meta_class(cls):
"""
returns volume mesh meta info class: VolumeMeshMeta
"""
return VolumeMeshMeta
@classmethod
def _params_ancestor_id_name(cls):
"""
returns surfaceMeshId name
"""
return "surfaceMeshId"
[docs]
@classmethod
def from_cloud(cls, mesh_id: str):
"""
Get volume mesh info from cloud
:param mesh_id:
:return:
"""
return cls(mesh_id)
def _get_file_extention(self):
compression = self.info.compression
mesh_format = self.info.mesh_format
endianness = self.info.endianness
return f"{endianness.ext()}{mesh_format.ext()}{compression.ext()}"
def _remote_file_name(self):
"""
mesh filename on cloud
"""
remote_file_name = None
for file in self.get_download_file_list():
_, file_name_no_compression = CompressionFormat.detect(file["fileName"])
try:
VolumeMeshFileFormat.detect(file_name_no_compression)
remote_file_name = file["fileName"]
except Flow360RuntimeError:
continue
if remote_file_name is None:
raise Flow360CloudFileError(f"No volume mesh file found for id={self.id}")
return remote_file_name
[docs]
@classmethod
def from_file(
cls,
file_name: str,
params: Union[Flow360MeshParams, None] = None,
name: str = None,
tags: List[str] = None,
solver_version=None,
endianess: UGRIDEndianness = None,
isascii: bool = False,
) -> VolumeMeshDraft:
"""
Upload volume mesh from file
:param volume_mesh_name:
:param file_name:
:param params:
:param tags:
:param solver_version:
:return:
"""
return VolumeMeshDraft(
file_name=file_name,
name=name,
tags=tags,
solver_version=solver_version,
params=params,
endianess=endianess,
isascii=isascii,
)
[docs]
@classmethod
def copy_from_example(
cls,
example_id: str,
name: str = None,
) -> VolumeMesh:
"""
Create a new volume mesh by copying from an example mesh identified by `example_id`.
Parameters
----------
example_id : str
The unique identifier of the example volume mesh to copy from.
name : str, optional
The name to assign to the new volume mesh. If not provided, the name
of the example volume mesh will be used.
Returns
-------
VolumeMesh
A new instance of VolumeMesh copied from the example mesh if successful.
Examples
--------
>>> new_mesh = VolumeMesh.copy_from_example('example_id_123', name='New Mesh')
"""
if name is None:
eg_vm = cls(example_id)
name = eg_vm.name
req = CopyExampleVolumeMeshRequest(example_id=example_id, name=name)
resp = RestApi(f"{VolumeMeshInterface.endpoint}/examples/copy").post(req.dict())
if not resp:
raise RuntimeError("Something went wrong when accessing example mesh.")
info = VolumeMeshMeta(**resp)
return cls(info.id)
[docs]
@classmethod
def create(
cls,
name: str,
params: VolumeMeshingParams,
surface_mesh_id,
tags: List[str] = None,
solver_version=None,
) -> VolumeMeshDraft:
"""
Create volume mesh from surface mesh
"""
return VolumeMeshDraft(
name=name,
surface_mesh_id=surface_mesh_id,
solver_version=solver_version,
params=params,
tags=tags,
)
[docs]
def create_case(
self,
name: str,
params: Flow360Params,
tags: List[str] = None,
solver_version: str = None,
) -> CaseDraft:
"""
Create new case
:param name:
:param params:
:param tags:
:return:
"""
return Case.create(
name, params, volume_mesh_id=self.id, tags=tags, solver_version=solver_version
)
class VolumeMeshList(Flow360ResourceListBase):
"""
VolumeMesh List component
"""
def __init__(
self,
surface_mesh_id: str = None,
from_cloud: bool = True,
include_deleted: bool = False,
limit=100,
):
super().__init__(
ancestor_id=surface_mesh_id,
from_cloud=from_cloud,
include_deleted=include_deleted,
limit=limit,
resourceClass=VolumeMesh,
)
def filter(self):
"""
flitering list, not implemented yet
"""
raise NotImplementedError("Filters are not implemented yet")
# resp = list(filter(lambda i: i['caseStatus'] != 'deleted', resp))
# pylint: disable=useless-parent-delegation
def __getitem__(self, index) -> VolumeMesh:
"""
returns VolumeMeshMeta item of the list
"""
return super().__getitem__(index)
# pylint: disable=useless-parent-delegation
def __iter__(self) -> Iterator[VolumeMesh]:
return super().__iter__()