Skip to content
5 changes: 4 additions & 1 deletion flow360/component/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import json
import os
import tempfile
from typing import Any, Iterator, List, Optional, Union
from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Union

import pydantic as pd
import pydantic.v1 as pd_v1
Expand Down Expand Up @@ -76,6 +76,9 @@
from .v1.flow360_params import Flow360Params, UnvalidatedFlow360Params
from .validator import Validator

if TYPE_CHECKING:
from flow360.component.volume_mesh import VolumeMeshV2


class CaseBase:
"""
Expand Down
1 change: 0 additions & 1 deletion flow360/component/results/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,6 @@ def full_name_pattern(word: str) -> re.Pattern:
return rf"^(?:{re.escape(word)}|[^/]+/{re.escape(word)})$"

self.reload_data() # Remove all the imposed filters
print(">> _x_columns =", self._x_columns)
raw_values = {}
for x_column in self._x_columns:
raw_values[x_column] = np.array(self.raw_values[x_column])
Expand Down
150 changes: 67 additions & 83 deletions flow360/component/simulation/framework/entity_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import copy
import hashlib
import uuid
from abc import ABCMeta
Expand Down Expand Up @@ -291,15 +290,62 @@ def _valid_individual_input(cls, input_data):
"Expected entity instance."
)

@classmethod
def _process_selector(cls, selector: EntitySelector, valid_type_names: List[str]) -> dict:
"""Process and validate an EntitySelector object."""
if selector.target_class not in valid_type_names:
raise ValueError(
f"Selector target_class ({selector.target_class}) is incompatible "
f"with EntityList types {valid_type_names}."
)
return selector.model_dump()

@classmethod
def _process_entity(cls, entity: EntityBase, valid_types: tuple) -> Optional[EntityBase]:
"""Process and validate an entity object. Returns None if entity type is invalid."""
cls._valid_individual_input(entity)
if is_exact_instance(entity, valid_types):
return entity
return None

@classmethod
def _build_result(
cls, entities_to_store: List[EntityBase], entity_patterns_to_store: List[dict]
) -> dict:
"""Build the final result dictionary."""
return {
"stored_entities": entities_to_store,
"selectors": entity_patterns_to_store if entity_patterns_to_store else None,
}

@classmethod
# pylint: disable=too-many-arguments
def _process_single_item(
cls,
item: Union[EntityBase, EntitySelector],
valid_types: tuple,
valid_type_names: List[str],
entities_to_store: List[EntityBase],
entity_patterns_to_store: List[dict],
) -> None:
"""Process a single item (entity or selector) and add to appropriate storage lists."""
if isinstance(item, EntitySelector):
entity_patterns_to_store.append(cls._process_selector(item, valid_type_names))
else:
processed_entity = cls._process_entity(item, valid_types)
if processed_entity is not None:
entities_to_store.append(processed_entity)

@pd.model_validator(mode="before")
@classmethod
def deserializer(cls, input_data: Union[dict, list]):
def deserializer(cls, input_data: Union[dict, list, EntityBase, EntitySelector]):
"""
Flatten List[EntityBase] and put into stored_entities.
"""
entities_to_store = []
entity_patterns_to_store = []
valid_types = cls._get_valid_entity_types()
valid_type_names = [t.__name__ for t in valid_types]

if isinstance(input_data, list):
# -- User input mode. --
Expand All @@ -319,96 +365,34 @@ def deserializer(cls, input_data: Union[dict, list]):
]
)
else:
cls._valid_individual_input(item)
if is_exact_instance(item, tuple(valid_types)):
entities_to_store.append(item)
cls._process_single_item(
item,
tuple(valid_types),
valid_type_names,
entities_to_store,
entity_patterns_to_store,
)
elif isinstance(input_data, dict): # Deserialization
if "stored_entities" not in input_data:
raise KeyError(
f"Invalid input type to `entities`, dict {input_data} is missing the key 'stored_entities'."
)
return {
"stored_entities": input_data["stored_entities"],
"selectors": None if not entity_patterns_to_store else entity_patterns_to_store,
}
# pylint: disable=no-else-return
else: # Single entity
return cls._build_result(input_data["stored_entities"], input_data.get("selectors", []))
else: # Single entity or selector
if input_data is None:
return {
"stored_entities": None,
"selectors": None if not entity_patterns_to_store else entity_patterns_to_store,
}
else:
cls._valid_individual_input(input_data)
if is_exact_instance(input_data, tuple(valid_types)):
entities_to_store.append(input_data)
return cls._build_result(None, [])
cls._process_single_item(
input_data,
tuple(valid_types),
valid_type_names,
entities_to_store,
entity_patterns_to_store,
)

if not entities_to_store:
if not entities_to_store and not entity_patterns_to_store:
raise ValueError(
f"Can not find any valid entity of type {[valid_type.__name__ for valid_type in valid_types]}"
f" from the input."
)

return {
"stored_entities": entities_to_store,
"selectors": None if not entity_patterns_to_store else entity_patterns_to_store,
}

def _get_expanded_entities(
self,
*,
create_hard_copy: bool,
) -> List[EntityBase]:
"""
Processes `stored_entities` to remove duplicate entities and raise error if conflicting entities are found.

Possible future upgrade includes expanding `TokenEntity` (naming pattern, enabling compact data storage
like MatrixType and also templating SimulationParams which is planned when importing JSON as setting template)

Raises:
TypeError: If an entity does not match the expected type.
Returns:
Expanded entities list.
"""

entities = getattr(self, "stored_entities", [])

expanded_entities = []
# Note: Points need to skip deduplication bc:
# 1. Performance of deduplication is slow when Point count is high.
not_merged_entity_types_name = [
"Point"
] # Entity types that need skipping deduplication (hacky)
not_merged_entities = []

# pylint: disable=not-an-iterable
for entity in entities:
if entity.private_attribute_entity_type_name in not_merged_entity_types_name:
not_merged_entities.append(entity)
continue
# if entity not in expanded_entities:
expanded_entities.append(entity)

expanded_entities = _remove_duplicate_entities(expanded_entities)
expanded_entities += not_merged_entities

if not expanded_entities:
raise ValueError(
f"Failed to find any matching entity with {entities}. Please check the input to entities."
)
# pylint: disable=fixme
# TODO: As suggested by Runda. We better prompt user what entities are actually used/expanded to
# TODO: avoid user input error. We need a switch to turn it on or off.
if create_hard_copy is True:
return copy.deepcopy(expanded_entities)
return expanded_entities

# pylint: disable=arguments-differ
def preprocess(self, **kwargs):
"""
Expand and overwrite self.stored_entities in preparation for submission/serialization.
Should only be called as late as possible to incorporate all possible changes.
"""
# WARNING: this is very expensive all for long lists as it is quadratic
self.stored_entities = self._get_expanded_entities(create_hard_copy=False)
return super().preprocess(**kwargs)
return cls._build_result(entities_to_store, entity_patterns_to_store)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Scoped context for entity materialization and reuse.

This module provides a context-managed cache and an injectable builder
for converting entity dictionaries to model instances, avoiding global
state and enabling high-performance reuse during validation.
"""

from __future__ import annotations

import contextvars
from typing import Any, Callable, Optional

_entity_cache_ctx: contextvars.ContextVar[Optional[dict]] = contextvars.ContextVar(
"entity_cache", default=None
)
_entity_builder_ctx: contextvars.ContextVar[Optional[Callable[[dict], Any]]] = (
contextvars.ContextVar("entity_builder", default=None)
)


class EntityMaterializationContext:
"""Context manager providing a per-validation scoped cache and builder.

Use this to avoid global state when materializing entity dictionaries
into model instances while reusing objects across the validation pass.
"""

def __init__(self, *, builder: Callable[[dict], Any]):
self._token_cache = None
self._token_builder = None
self._builder = builder

def __enter__(self):
self._token_cache = _entity_cache_ctx.set({})
self._token_builder = _entity_builder_ctx.set(self._builder)
return self

def __exit__(self, exc_type, exc, tb):
_entity_cache_ctx.reset(self._token_cache)
_entity_builder_ctx.reset(self._token_builder)


def get_entity_cache() -> Optional[dict]:
"""Return the current cache dict for entity reuse, or None if not active."""

return _entity_cache_ctx.get()


def get_entity_builder() -> Optional[Callable[[dict], Any]]:
"""Return the current dict->entity builder, or None if not active."""

return _entity_builder_ctx.get()
Loading