From 6e5196ddf86b2aef2884b69cacebe72745fb043a Mon Sep 17 00:00:00 2001 From: Ira Shokar Date: Mon, 22 Dec 2025 16:21:01 +0000 Subject: [PATCH] add cgns and ensight etl tutorials --- .../tutorials/etl_cgns_to_numpy/README.md | 127 +++++++++ .../tutorials/etl_cgns_to_numpy/__init__.py | 25 ++ .../etl_cgns_to_numpy/cgns_data_source.py | 158 +++++++++++ .../cgns_to_numpy_transformation.py | 106 +++++++ .../etl_cgns_to_numpy/generate_sample_data.py | 120 ++++++++ .../etl_cgns_to_numpy/numpy_data_source.py | 110 ++++++++ .../tutorials/etl_cgns_to_numpy/run_etl.py | 93 +++++++ .../etl_cgns_to_numpy/tutorial_config.yaml | 50 ++++ .../etl_cgns_to_numpy/tutorial_validator.py | 263 ++++++++++++++++++ examples/tutorials/etl_cgns_to_zarr/README.md | 129 +++++++++ .../tutorials/etl_cgns_to_zarr/__init__.py | 25 ++ .../etl_cgns_to_zarr/cgns_data_source.py | 158 +++++++++++ .../cgns_to_zarr_transformation.py | 151 ++++++++++ .../etl_cgns_to_zarr/generate_sample_data.py | 120 ++++++++ .../tutorials/etl_cgns_to_zarr/run_etl.py | 93 +++++++ .../etl_cgns_to_zarr/tutorial_config.yaml | 51 ++++ .../etl_cgns_to_zarr/tutorial_validator.py | 263 ++++++++++++++++++ .../etl_cgns_to_zarr/zarr_data_source.py | 118 ++++++++ .../tutorials/etl_ensight_to_numpy/README.md | 144 ++++++++++ .../etl_ensight_to_numpy/__init__.py | 25 ++ .../ensight_data_source.py | 154 ++++++++++ .../ensight_to_numpy_transformation.py | 84 ++++++ .../generate_sample_data.py | 77 +++++ .../etl_ensight_to_numpy/numpy_data_source.py | 110 ++++++++ .../tutorials/etl_ensight_to_numpy/run_etl.py | 64 +++++ .../etl_ensight_to_numpy/tutorial_config.yaml | 51 ++++ .../tutorial_validator.py | 245 ++++++++++++++++ .../tutorials/etl_ensight_to_zarr/README.md | 135 +++++++++ .../tutorials/etl_ensight_to_zarr/__init__.py | 25 ++ .../ensight_data_source.py | 154 ++++++++++ .../ensight_to_zarr_transformation.py | 120 ++++++++ .../generate_sample_data.py | 77 +++++ .../tutorials/etl_ensight_to_zarr/run_etl.py | 81 ++++++ .../etl_ensight_to_zarr/tutorial_config.yaml | 52 ++++ .../etl_ensight_to_zarr/tutorial_validator.py | 245 ++++++++++++++++ .../etl_ensight_to_zarr/zarr_data_source.py | 118 ++++++++ 36 files changed, 4121 insertions(+) create mode 100644 examples/tutorials/etl_cgns_to_numpy/README.md create mode 100644 examples/tutorials/etl_cgns_to_numpy/__init__.py create mode 100644 examples/tutorials/etl_cgns_to_numpy/cgns_data_source.py create mode 100644 examples/tutorials/etl_cgns_to_numpy/cgns_to_numpy_transformation.py create mode 100644 examples/tutorials/etl_cgns_to_numpy/generate_sample_data.py create mode 100644 examples/tutorials/etl_cgns_to_numpy/numpy_data_source.py create mode 100644 examples/tutorials/etl_cgns_to_numpy/run_etl.py create mode 100644 examples/tutorials/etl_cgns_to_numpy/tutorial_config.yaml create mode 100644 examples/tutorials/etl_cgns_to_numpy/tutorial_validator.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/README.md create mode 100644 examples/tutorials/etl_cgns_to_zarr/__init__.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/cgns_data_source.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/cgns_to_zarr_transformation.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/generate_sample_data.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/run_etl.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/tutorial_config.yaml create mode 100644 examples/tutorials/etl_cgns_to_zarr/tutorial_validator.py create mode 100644 examples/tutorials/etl_cgns_to_zarr/zarr_data_source.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/README.md create mode 100644 examples/tutorials/etl_ensight_to_numpy/__init__.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/ensight_data_source.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/ensight_to_numpy_transformation.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/generate_sample_data.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/numpy_data_source.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/run_etl.py create mode 100644 examples/tutorials/etl_ensight_to_numpy/tutorial_config.yaml create mode 100644 examples/tutorials/etl_ensight_to_numpy/tutorial_validator.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/README.md create mode 100644 examples/tutorials/etl_ensight_to_zarr/__init__.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/ensight_data_source.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/ensight_to_zarr_transformation.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/generate_sample_data.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/run_etl.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/tutorial_config.yaml create mode 100644 examples/tutorials/etl_ensight_to_zarr/tutorial_validator.py create mode 100644 examples/tutorials/etl_ensight_to_zarr/zarr_data_source.py diff --git a/examples/tutorials/etl_cgns_to_numpy/README.md b/examples/tutorials/etl_cgns_to_numpy/README.md new file mode 100644 index 0000000..61cd798 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/README.md @@ -0,0 +1,127 @@ +# PhysicsNeMo-Curator Tutorial: CGNS to NumPy + +## Overview + +This tutorial demonstrates how to use the PhysicsNeMo-Curator ETL pipeline to: + +1. Extract physics simulation data from **CGNS (CFD General Notation System)** files. +2. Transform the data into standard **NumPy arrays** with configurable precision. +3. Write the processed data to efficient, compressed `.npz` files with sidecar metadata. + +## 1. Create a Dataset + +PhysicsNeMo-Curator works with well-defined formats and schemas. For this tutorial, we define a custom simulation dataset using: + +* **Format**: CGNS (Computational Fluid Dynamics General Notation System) +* **Storage**: Local filesystem +* **Schema**: Each simulation run contains a mesh with the following fields: + +| Field Name | Type | Description | +| --- | --- | --- | +| `coordinates` | `(N, 3)` | Spatial coordinates (x, y, z) of mesh points | +| `faces` | `(M, 4)` | Mesh connectivity information (triangulated) | +| `Temperature` | `(N,)` | Scalar temperature field | +| `Pressure` | `(N,)` | Scalar pressure field | +| `Velocity` | `(N, 3)` | 3D velocity vector field | +| `Density` | `(N,)` | Scalar density field | +| `Vorticity` | `(N,)` | Scalar vorticity field | + +### Generate Sample Data + +We have provided a script to generate 5 simulation runs with random physics-like data on a spherical mesh. + +To generate the data: + +```bash +python generate_sample_data.py + +``` + +This will create a `tutorial_data/` directory containing 5 `.cgns` files (e.g., `run_001.cgns`). + +## 2. The ETL Pipeline + +The pipeline consists of four main components orchestrated to process files in parallel. + +### A. Source: `CGNSDataSource` + +* **File**: `cgns_data_source.py` +* **Function**: Reads CGNS files using `pyvista`. It extracts the mesh geometry (`coordinates`, `faces`) and all point data fields (`Temperature`, `Velocity`, etc.). + +### B. Transformation: `CGNSToNumpyTransformation` + +* **File**: `cgns_to_numpy_transformation.py` +* **Function**: Converts raw CGNS data into standard NumPy arrays. +* **Precision Control**: Configurable to output `float32` (default) or `float64`. +* **Vector Handling**: Automatically computes magnitude arrays for 2D/vector fields (e.g., `Velocity` -> `Velocity_magnitude`). +* **Statistics**: Calculates comprehensive statistics (min, max, mean, std) for all fields. + + + +### C. Sink: `NumpyDataSource` + +* **File**: `numpy_data_source.py` +* **Function**: Writes the transformed data to disk. +* **Data**: Saved as compressed `.npz` files (using `np.savez_compressed`). +* **Metadata**: Saved as separate `.json` sidecar files containing file info and calculated statistics. + + + +### D. Validator: `TutorialValidator` + +* **File**: `tutorial_validator.py` +* **Function**: Validates the input CGNS files before processing. It checks for valid mesh structure (points, cells), dimensions, and data integrity (NaN/Inf checks). + +## 3. Configuration + +The pipeline is configured using Hydra via `tutorial_config.yaml`. + +```yaml +etl: + processing: + num_processes: 2 # Parallel execution + + validator: + _target_: tutorial_validator.TutorialValidator + validation_level: "fields" + + source: + _target_: cgns_data_source.CGNSDataSource + + transformations: + cgns_to_numpy: + _target_: cgns_to_numpy_transformation.CGNSToNumpyTransformation + precision: "float32" # or "float64" + + sink: + _target_: numpy_data_source.NumpyDataSource + +``` + +## 4. Run the Pipeline + +To run the ETL pipeline, use the `run_etl.py` script. You must specify the input and output directories. + +```bash +python run_etl.py \ + etl.source.input_dir=tutorial_data \ + etl.sink.output_dir=output_numpy + +``` + +## 5. Output + +After execution, the `output_numpy/` directory will contain paired files for each run: + +1. **`run_001.npz`**: A compressed archive containing the NumPy arrays (`coordinates`, `faces`, `Temperature`, `Velocity`, etc.). +2. **`run_001.json`**: A JSON file containing metadata, such as: +```json +{ + "num_points": 1024, + "precision": "", + "Temperature_mean": 300.5, + "Temperature_std": 12.1, + "Velocity_magnitude_max": 15.2 +} + +``` diff --git a/examples/tutorials/etl_cgns_to_numpy/__init__.py b/examples/tutorials/etl_cgns_to_numpy/__init__.py new file mode 100644 index 0000000..aa5ad60 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/__init__.py @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tutorial ETL pipeline for HDF5 to Zarr conversion.""" + +from . import ( + h5_data_source, # noqa: F401 + h5_to_zarr_transformation, # noqa: F401 + tutorial_config, # noqa: F401 + tutorial_validator, # noqa: F401 + zarr_data_source, # noqa: F401 +) diff --git a/examples/tutorials/etl_cgns_to_numpy/cgns_data_source.py b/examples/tutorials/etl_cgns_to_numpy/cgns_data_source.py new file mode 100644 index 0000000..1da3925 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/cgns_data_source.py @@ -0,0 +1,158 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np +import pyvista as pv + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class CGNSDataSource(DataSource): + """DataSource for reading CGNS physics simulation files.""" + + def __init__(self, cfg: ProcessingConfig, input_dir: str): + """Initialize the CGNS data source. + + Args: + cfg: Processing configuration + input_dir: Directory containing input CGNS files + """ + super().__init__(cfg) + self.input_dir = Path(input_dir) + + if not self.input_dir.exists(): + raise FileNotFoundError(f"Input directory {self.input_dir} does not exist") + + def get_file_list(self) -> List[str]: + """Get list of CGNS files to process. + + Returns: + List of filenames (without extension) to process + """ + # Find all .cgns files and return their base names + cgns_files = list(self.input_dir.glob("*.cgns")) + filenames = [f.stem for f in cgns_files] # Remove .cgns extension + + self.logger.info(f"Found {len(filenames)} CGNS files to process") + return sorted(filenames) + + def _read_cgns_mesh(self, filepath: Path): + """Read a CGNS file and extract the mesh. + + Args: + filepath: Path to the CGNS file + + Returns: + pyvista mesh object or None if reading fails + """ + try: + reader = pv.CGNSReader(str(filepath)) + # Turn off loading the interior mesh + reader.load_boundary_patch = False + mesh = reader.read() + + # Check if the mesh is valid and contains a block to process + if not mesh or not mesh[0]: + self.logger.warning(f"No valid data found in {filepath}. Skipping.") + return None + + original = mesh[0][0] + return original + + except Exception as e: + self.logger.error(f"Error processing file {filepath}: {e}") + return None + + def read_file(self, filename: str) -> Dict[str, Any]: + """Read one CGNS file and extract all data. + + Args: + filename: Base filename (without extension) + + Returns: + Dictionary containing extracted data and metadata + """ + filepath = self.input_dir / f"{filename}.cgns" + if not filepath.exists(): + raise FileNotFoundError(f"File not found: {filepath}") + + self.logger.warning(f"Reading {filepath}") + + # Read the CGNS mesh + original_mesh = self._read_cgns_mesh(filepath) + if original_mesh is None: + raise ValueError(f"Failed to read CGNS file: {filepath}") + + # Extract surface and triangulate + surface_mesh = original_mesh.extract_surface().triangulate() + + # Convert cell data to point data if present + if surface_mesh.cell_data: + self.logger.info("Found cell data. Converting to point data.") + surface_mesh = surface_mesh.cell_data_to_point_data() + + # Build data dictionary + data = {} + + # Extract coordinates (points) + data["coordinates"] = np.array(surface_mesh.points) + + # Extract connectivity/faces information + data["faces"] = np.array(surface_mesh.faces).reshape(-1, 4)[:, 1:] # Remove size prefix + + # Extract all point data fields + for field_name in surface_mesh.point_data.keys(): + data[field_name] = np.array(surface_mesh.point_data[field_name]) + self.logger.info(f"Extracted point data field: {field_name}") + + # Store metadata + metadata = { + "n_points": surface_mesh.n_points, + "n_cells": surface_mesh.n_cells, + "bounds": surface_mesh.bounds, + } + data["metadata"] = metadata + data["filename"] = filename + + self.logger.warning(f"Loaded data with {surface_mesh.n_points} points and {surface_mesh.n_cells} cells") + return data + + def _get_output_path(self, filename: str) -> Path: + """Get the final output path for a given filename. + + Args: + filename: Name of the file to process + + Returns: + Path object representing the final output location + """ + raise NotImplementedError("CGNSDataSource only supports reading") + + def _write_impl_temp_file( + self, + data: Dict[str, Any], + output_path: Path, + ) -> None: + """Not implemented - this DataSource only reads.""" + raise NotImplementedError("CGNSDataSource only supports reading") + + def should_skip(self, filename: str) -> bool: + """Never skip files for reading.""" + return False diff --git a/examples/tutorials/etl_cgns_to_numpy/cgns_to_numpy_transformation.py b/examples/tutorials/etl_cgns_to_numpy/cgns_to_numpy_transformation.py new file mode 100644 index 0000000..33c9ef7 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/cgns_to_numpy_transformation.py @@ -0,0 +1,106 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import numpy as np + +from physicsnemo_curator.etl.data_transformations import DataTransformation +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class CGNSToNumpyTransformation(DataTransformation): + """Transform CGNS data into NumPy array format.""" + + def __init__( + self, cfg: ProcessingConfig, precision: str = "float32" + ): + """Initialize the transformation. + + Args: + cfg: Processing configuration + precision: Data precision for NumPy arrays ('float32' or 'float64') + """ + super().__init__(cfg) + self.dtype = np.float32 if precision == "float32" else np.float64 + + def transform(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform CGNS data to NumPy array format. + + Args: + data: Dictionary from CGNSDataSource.read_file() + + Returns: + Dictionary with NumPy arrays and metadata + """ + self.logger.info(f"Transforming {data['filename']} to NumPy arrays") + + # Get the number of points + num_points = len(data["coordinates"]) + + # Prepare arrays + numpy_data = {} + + # Coordinates (2D array: points x 3 dimensions) + numpy_data["coordinates"] = data["coordinates"].astype(self.dtype) + + # Faces/connectivity (2D array: cells x 3 for triangles) + if "faces" in data: + numpy_data["faces"] = data["faces"].astype(np.int32) + + # Process all point data fields (e.g., pressure, velocity, temperature, etc.) + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue # Skip already processed fields + + if isinstance(field_data, np.ndarray): + self.logger.info(f"Processing field: {field_name} with shape {field_data.shape}") + + # Convert to specified precision + numpy_data[field_name] = field_data.astype(self.dtype) + + # Build comprehensive metadata + metadata = data.get("metadata", {}) + metadata["num_points"] = num_points + metadata["precision"] = str(self.dtype) + metadata["format"] = "numpy" + + # Add statistics for each field + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue + + if isinstance(field_data, np.ndarray): + if field_data.ndim == 1: + # Add statistics for scalar fields + metadata[f"{field_name}_min"] = float(np.min(field_data)) + metadata[f"{field_name}_max"] = float(np.max(field_data)) + metadata[f"{field_name}_mean"] = float(np.mean(field_data)) + metadata[f"{field_name}_std"] = float(np.std(field_data)) + elif field_data.ndim == 2: + # For vector fields, compute magnitude statistics + magnitude = np.linalg.norm(field_data, axis=1) + metadata[f"{field_name}_magnitude_max"] = float(np.max(magnitude)) + metadata[f"{field_name}_magnitude_mean"] = float(np.mean(magnitude)) + metadata[f"{field_name}_magnitude_std"] = float(np.std(magnitude)) + + # Store magnitude as a separate field + numpy_data[f"{field_name}_magnitude"] = magnitude.astype(self.dtype) + + numpy_data["metadata"] = metadata + + return numpy_data + diff --git a/examples/tutorials/etl_cgns_to_numpy/generate_sample_data.py b/examples/tutorials/etl_cgns_to_numpy/generate_sample_data.py new file mode 100644 index 0000000..3293bd8 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/generate_sample_data.py @@ -0,0 +1,120 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import numpy as np +import pyvista as pv + + +def generate_mesh_with_fields(run_number, resolution=20): + """Generate a mesh with simulation fields for one run. + + Args: + run_number: Run identifier to add variation + resolution: Mesh resolution (higher = more points) + + Returns: + pyvista mesh with point data fields + """ + # Create a sphere mesh with some variation based on run number + radius = 1.0 + 0.1 * np.sin(run_number) + mesh = pv.Sphere(radius=radius, theta_resolution=resolution, phi_resolution=resolution) + + # Get points for field generation + points = mesh.points + num_points = mesh.n_points + + # Generate temperature field based on distance from origin + distances = np.linalg.norm(points, axis=1) + temperature = 250.0 + 100.0 * (distances / distances.max()) + np.random.normal(0, 5, num_points) + mesh.point_data["Temperature"] = temperature.astype(np.float32) + + # Generate pressure field with some variation + pressure = 101325.0 + 1000.0 * np.sin(points[:, 2] * np.pi) + np.random.normal(0, 100, num_points) + mesh.point_data["Pressure"] = pressure.astype(np.float32) + + # Generate velocity field (3D vectors) + # Create a rotational flow pattern + velocity = np.zeros((num_points, 3), dtype=np.float32) + velocity[:, 0] = -points[:, 1] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 1] = points[:, 0] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 2] = points[:, 2] * 0.5 + np.random.normal(0, 0.3, num_points) + mesh.point_data["Velocity"] = velocity + + # Generate density field + density = 1.2 + 0.2 * np.sin(distances * np.pi * 2) + np.random.normal(0, 0.05, num_points) + mesh.point_data["Density"] = density.astype(np.float32) + + # Generate vorticity magnitude (scalar derived from position) + vorticity = 2.0 * np.ones(num_points) + 0.5 * np.cos(distances * 3) + np.random.normal(0, 0.1, num_points) + mesh.point_data["Vorticity"] = vorticity.astype(np.float32) + + return mesh + + +def create_cgns_file(run_number, output_dir="tutorial_data", resolution=20): + """Create one CGNS file for a simulation run. + + Args: + run_number: Run identifier + output_dir: Directory to save the file + resolution: Mesh resolution + """ + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Generate mesh with fields + mesh = generate_mesh_with_fields(run_number, resolution) + num_points = mesh.n_points + num_cells = mesh.n_cells + + # Create CGNS filename + filename = f"run_{run_number:03d}.cgns" + filepath = os.path.join(output_dir, filename) + + # Save as CGNS file + # Note: PyVista writes CGNS files through VTK's CGNS writer + mesh.save(filepath) + + # Print summary of generated data + print(f"Created {filepath}:") + print(f" - {num_points} points") + print(f" - {num_cells} cells") + print(f" - Fields: {', '.join(mesh.point_data.keys())}") + + +def main(): + """Generate sample dataset with 5 simulation runs.""" + print("Generating sample CGNS physics simulation dataset...") + print() + + # Generate 5 runs with varying resolution for diversity + resolutions = [15, 20, 18, 22, 20] + + for run_num in range(1, 6): + create_cgns_file(run_num, resolution=resolutions[run_num - 1]) + + print("\nDataset generation complete!") + print("Created 5 CGNS files in the 'tutorial_data/' directory") + print("Each file contains a sphere mesh with temperature, pressure, velocity, density, and vorticity fields") + print("\nYou can now run the ETL pipeline with:") + print(" python run_etl.py etl.source.input_dir=tutorial_data") + + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_cgns_to_numpy/numpy_data_source.py b/examples/tutorials/etl_cgns_to_numpy/numpy_data_source.py new file mode 100644 index 0000000..8071461 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/numpy_data_source.py @@ -0,0 +1,110 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class NumpyDataSource(DataSource): + """DataSource for writing NumPy arrays to .npz files.""" + + def __init__(self, cfg: ProcessingConfig, output_dir: str): + """Initialize the NumPy data sink. + + Args: + cfg: Processing configuration + output_dir: Directory where .npz files will be written + """ + super().__init__(cfg) + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def get_file_list(self) -> List[str]: + """Not used for sink-only operations.""" + return [] + + def read_file(self, filename: str) -> Dict[str, Any]: + """Not implemented - this DataSource only writes.""" + raise NotImplementedError("NumpyDataSource only supports writing") + + def _get_output_path(self, filename: str) -> Path: + """Get the final output path for a given filename. + + Args: + filename: Name of the file to process + + Returns: + Path object representing the final output location + """ + return self.output_dir / f"{filename}.npz" + + def _write_impl_temp_file( + self, + data: Dict[str, Any], + output_path: Path, + ) -> None: + """Write data to a NumPy .npz file. + + Args: + data: Dictionary containing NumPy arrays and metadata + output_path: Path where the .npz file should be written + """ + self.logger.info(f"Writing NumPy arrays to {output_path}") + + # Separate metadata from array data + metadata = data.pop("metadata", {}) + filename = data.pop("filename", "unknown") + + # Save all arrays to .npz file (compressed) + # FIX: Use an open file object to prevent numpy from appending .npz to the temp filename + with open(output_path, "wb") as f: + np.savez_compressed(f, **data) + + # Save metadata as separate JSON file + # Note: This writes directly to the final .json path (sidecar file) + metadata_path = output_path.with_suffix(".json") + metadata["filename"] = filename + + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + + self.logger.info( + f"Saved {len(data)} arrays to {output_path} " + f"and metadata to {metadata_path}" + ) + + def should_skip(self, filename: str) -> bool: + """Check if output file already exists. + + Args: + filename: Name of the file to check + + Returns: + True if the output file already exists and we should skip processing + """ + output_path = self._get_output_path(filename) + exists = output_path.exists() + + if exists: + self.logger.info(f"Skipping {filename} - output already exists") + + return exists \ No newline at end of file diff --git a/examples/tutorials/etl_cgns_to_numpy/run_etl.py b/examples/tutorials/etl_cgns_to_numpy/run_etl.py new file mode 100644 index 0000000..1a06c93 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/run_etl.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CGNS to NumPy Tutorial ETL pipeline runner. + +This script demonstrates how to instantiate components and use the ETLOrchestrator +to convert CGNS data to NumPy format. + +Usage: + python run_etl.py \\ + etl.source.input_dir=/path/to/cgns_data \\ + etl.sink.output_dir=/path/to/numpy_output +""" + +import os +import sys + +# Add current directory to sys.path so Hydra can import tutorial modules +sys.path.insert(0, os.path.dirname(__file__)) + +import hydra +from hydra.utils import instantiate +from omegaconf import DictConfig + +from physicsnemo_curator.etl.etl_orchestrator import ETLOrchestrator +from physicsnemo_curator.etl.processing_config import ProcessingConfig +from physicsnemo_curator.utils import utils as curator_utils + + +@hydra.main(version_base="1.3", config_path=".", config_name="tutorial_config") +def main(cfg: DictConfig) -> None: + """Run the CGNS to NumPy tutorial ETL pipeline. + + This function: + 1. Sets up multiprocessing context + 2. Creates the processing config + 3. Instantiates all components (source, sink, transformations, validator) + 4. Passes them to the orchestrator + 5. Runs the pipeline + """ + # Set multiprocessing start method + curator_utils.setup_multiprocessing() + + # Create processing config with common settings + processing_config = ProcessingConfig(**cfg.etl.processing) + + # Create and run validator (if configured) + validator = None + if "validator" in cfg.etl: + validator = instantiate( + cfg.etl.validator, + processing_config, + **{k: v for k, v in cfg.etl.source.items() if not k.startswith("_")}, + ) + + # Instantiate source + source = instantiate(cfg.etl.source, processing_config) + + # Instantiate sink + sink = instantiate(cfg.etl.sink, processing_config) + + # Instantiate transformations + # Need to pass processing_config to each transformation, see: + # https://hydra.cc/docs/advanced/instantiate_objects/overview/#recursive-instantiation + cfgs = {k: {"_args_": [processing_config]} for k in cfg.etl.transformations.keys()} + transformations = instantiate(cfg.etl.transformations, **cfgs) + + # Create and run orchestrator with instantiated components + orchestrator = ETLOrchestrator( + source=source, + sink=sink, + transformations=transformations, + processing_config=processing_config, + validator=validator, + ) + orchestrator.run() + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_cgns_to_numpy/tutorial_config.yaml b/examples/tutorials/etl_cgns_to_numpy/tutorial_config.yaml new file mode 100644 index 0000000..3c2d1a9 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/tutorial_config.yaml @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tutorial ETL Pipeline Configuration +# This demonstrates the complete CGNS -> NumPy processing pipeline + +etl: + # Processing settings + processing: + num_processes: 2 # Use 2 processes for this small tutorial dataset + args: {} + + # Validation (runs first) + validator: + _target_: tutorial_validator.TutorialValidator + _convert_: all + input_dir: ??? # Will be provided via command line + validation_level: "fields" # Full validation including data content + + # Source (reads CGNS files) + source: + _target_: cgns_data_source.CGNSDataSource + _convert_: all + input_dir: ??? # Will be provided via command line + + # Transformations (convert to NumPy format) + transformations: + cgns_to_numpy: + _target_: cgns_to_numpy_transformation.CGNSToNumpyTransformation + _convert_: all + precision: "float32" # or "float64" for double precision + + # Sink (writes NumPy .npz files) + sink: + _target_: numpy_data_source.NumpyDataSource + _convert_: all + output_dir: ??? # Will be provided via command line diff --git a/examples/tutorials/etl_cgns_to_numpy/tutorial_validator.py b/examples/tutorials/etl_cgns_to_numpy/tutorial_validator.py new file mode 100644 index 0000000..93478b1 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_numpy/tutorial_validator.py @@ -0,0 +1,263 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import List + +import numpy as np +import pyvista as pv + +from physicsnemo_curator.etl.dataset_validators import ( + DatasetValidator, + ValidationError, + ValidationLevel, +) +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class TutorialValidator(DatasetValidator): + """Validator for CGNS physics simulation dataset.""" + + def __init__( + self, cfg: ProcessingConfig, input_dir: str, validation_level: str = "fields" + ): + """Initialize the validator. + + Args: + cfg: Processing configuration + input_dir: Directory containing CGNS files to validate + validation_level: "structure" or "fields" + """ + super().__init__(cfg) + self.input_dir = Path(input_dir) + self.validation_level = ValidationLevel(validation_level) + + # Define minimum requirements for CGNS mesh + self.min_points = 3 # At least 3 points to form a triangle + self.min_cells = 1 # At least 1 cell + self.expected_spatial_dims = 3 # 3D coordinates (x, y, z) + + def validate(self) -> List[ValidationError]: + """Validate the entire dataset. + + Returns: + List of validation errors (empty if validation passes) + """ + errors = [] + + # Check if input directory exists + if not self.input_dir.exists(): + errors.append( + ValidationError( + path=self.input_dir, + message=f"Input directory does not exist: {self.input_dir}", + level=self.validation_level, + ) + ) + return errors + + # Find all CGNS files + cgns_files = list(self.input_dir.glob("*.cgns")) + + if not cgns_files: + errors.append( + ValidationError( + path=self.input_dir, + message="No CGNS files found in input directory", + level=self.validation_level, + ) + ) + return errors + + # Validate each file + for cgns_file in cgns_files: + file_errors = self.validate_single_item(cgns_file) + errors.extend(file_errors) + + return errors + + def validate_single_item(self, item: Path) -> List[ValidationError]: + """Validate a single CGNS file. + + Args: + item: Path to CGNS file to validate + + Returns: + List of validation errors for this file + """ + errors = [] + + try: + # Try to read the CGNS file + reader = pv.CGNSReader(str(item)) + reader.load_boundary_patch = False + mesh = reader.read() + + # Check if mesh is valid + if not mesh or not mesh[0]: + errors.append( + ValidationError( + path=item, + message="CGNS file contains no valid mesh data", + level=self.validation_level, + ) + ) + return errors + + original_mesh = mesh[0][0] + + # Structure validation + errors.extend(self._validate_structure(original_mesh, item)) + + # Field validation (if requested and structure is valid) + if self.validation_level == ValidationLevel.FIELDS and not errors: + errors.extend(self._validate_fields(original_mesh, item)) + + except Exception as e: + errors.append( + ValidationError( + path=item, + message=f"Failed to open CGNS file: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_structure( + self, mesh: pv.DataSet, file_path: Path + ) -> List[ValidationError]: + """Validate CGNS mesh structure.""" + errors = [] + + # Check mesh has points + if mesh.n_points < self.min_points: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few points: {mesh.n_points} (minimum: {self.min_points})", + level=self.validation_level, + ) + ) + + # Check mesh has cells + if mesh.n_cells < self.min_cells: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few cells: {mesh.n_cells} (minimum: {self.min_cells})", + level=self.validation_level, + ) + ) + + # Check coordinate dimensions + if mesh.points is not None: + if mesh.points.shape[1] != self.expected_spatial_dims: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh points have wrong dimensions: expected {self.expected_spatial_dims}D, got {mesh.points.shape[1]}D", + level=self.validation_level, + ) + ) + + # Check if mesh can be extracted and triangulated (needed for surface processing) + try: + surface_mesh = mesh.extract_surface().triangulate() + if surface_mesh.n_points == 0: + errors.append( + ValidationError( + path=file_path, + message="Extracted surface mesh has no points", + level=self.validation_level, + ) + ) + except Exception as e: + errors.append( + ValidationError( + path=file_path, + message=f"Failed to extract and triangulate surface: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_fields(self, mesh: pv.DataSet, file_path: Path) -> List[ValidationError]: + """Validate field data content.""" + errors = [] + + # Extract surface and triangulate for field validation + try: + surface_mesh = mesh.extract_surface().triangulate() + + # Convert cell data to point data if present + if surface_mesh.cell_data: + surface_mesh = surface_mesh.cell_data_to_point_data() + + except Exception as e: + errors.append( + ValidationError( + path=file_path, + message=f"Failed to process mesh for field validation: {str(e)}", + level=self.validation_level, + ) + ) + return errors + + # Check that mesh has at least some point data fields + if not surface_mesh.point_data: + errors.append( + ValidationError( + path=file_path, + message="Mesh has no point data fields", + level=self.validation_level, + ) + ) + return errors + + # Validate each field + for field_name, field_data in surface_mesh.point_data.items(): + # Check field size matches number of points + if len(field_data) != surface_mesh.n_points: + errors.append( + ValidationError( + path=file_path, + message=f"Field '{field_name}' size ({len(field_data)}) does not match number of points ({surface_mesh.n_points})", + level=self.validation_level, + ) + ) + + # Check for NaN or infinite values + if np.isnan(field_data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{field_name}' contains NaN values", + level=self.validation_level, + ) + ) + + if np.isinf(field_data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{field_name}' contains infinite values", + level=self.validation_level, + ) + ) + + return errors diff --git a/examples/tutorials/etl_cgns_to_zarr/README.md b/examples/tutorials/etl_cgns_to_zarr/README.md new file mode 100644 index 0000000..31d307d --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/README.md @@ -0,0 +1,129 @@ +# PhysicsNeMo-Curator Tutorial: CGNS to Zarr + +## Overview + +This tutorial demonstrates how to use the PhysicsNeMo-Curator ETL pipeline to: + +1. Extract physics simulation data from **CGNS (CFD General Notation System)** files. +2. Transform the data into an optimized, AI-model-ready format (Zarr). +3. Write the transformed data to disk efficiently. + +## 1. Create a Dataset + +PhysicsNeMo-Curator works with well-defined formats and schemas. For this tutorial, we define a custom simulation dataset using: + +* **Format**: CGNS (Computational Fluid Dynamics General Notation System) +* **Storage**: Local filesystem +* **Schema**: Each simulation run contains a mesh with the following fields: + +| Field Name | Type | Description | +| --- | --- | --- | +| `coordinates` | `(N, 3) float32` | Spatial coordinates (x, y, z) of mesh points | +| `faces` | `(M, 4) int32` | Mesh connectivity information | +| `Temperature` | `(N,) float32` | Scalar temperature field | +| `Pressure` | `(N,) float32` | Scalar pressure field | +| `Velocity` | `(N, 3) float32` | 3D velocity vector field | +| `Density` | `(N,) float32` | Scalar density field | +| `Vorticity` | `(N,) float32` | Scalar vorticity field | + +### Generate Sample Data + +We have provided a script to generate 5 simulation runs with random physics-like data on a spherical mesh. + +To generate the data: + +```bash +python generate_sample_data.py + +``` + +This will create a `tutorial_data/` directory containing 5 `.cgns` files (e.g., `run_001.cgns`). + +## 2. The ETL Pipeline + +The pipeline consists of four main components orchestrated to process files in parallel. + +### A. Source: `CGNSDataSource` + +* **File**: `cgns_data_source.py` +* **Function**: Reads CGNS files using `pyvista`. It extracts the mesh geometry (`coordinates`, `faces`) and all point data fields (`Temperature`, `Velocity`, etc.). + +### B. Transformation: `CGNSToZarrTransformation` + +* **File**: `cgns_to_zarr_transformation.py` +* **Function**: Converts the raw mesh data into a Zarr-optimized format. +* Applies chunking and compression (Zstd). +* Calculates derived statistics (min, max, mean) for all scalar fields. +* Calculates magnitude arrays and statistics for vector fields (e.g., `Velocity_magnitude`). + + + +### C. Sink: `ZarrDataSource` + +* **File**: `zarr_data_source.py` +* **Function**: Writes the transformed data into individual Zarr stores (directories). It preserves the metadata and structure defined by the transformation. + +### D. Validator: `TutorialValidator` + +* **File**: `tutorial_validator.py` +* **Function**: Validates the input CGNS files before processing. It checks for: +* Valid mesh structure (minimum points and cells). +* Correct spatial dimensions (3D). +* Presence of required field data. +* Data integrity (no NaNs or infinite values). + + + +## 3. Configuration + +The pipeline is configured using Hydra via `tutorial_config.yaml`. + +```yaml +etl: + processing: + num_processes: 2 # Parallel execution + + validator: + _target_: tutorial_validator.TutorialValidator + validation_level: "fields" + + source: + _target_: cgns_data_source.CGNSDataSource + + transformations: + cgns_to_zarr: + _target_: cgns_to_zarr_transformation.CGNSToZarrTransformation + chunk_size: 500 + compression_level: 3 + + sink: + _target_: zarr_data_source.ZarrDataSource + +``` + +## 4. Run the Pipeline + +To run the ETL pipeline, use the `run_etl.py` script. You must specify the input and output directories. + +```bash +python run_etl.py \ + etl.source.input_dir=tutorial_data \ + etl.sink.output_dir=output_zarr + +``` + +## 5. Output + +After execution, the `output_zarr/` directory will contain a separate Zarr store for each run (e.g., `run_001.zarr`). Each store will contain: + +* **Arrays**: +* `coordinates` +* `faces` +* `Temperature`, `Pressure`, `Density`, `Vorticity` +* `Velocity` +* `Velocity_magnitude` (derived) + + +* **Metadata (`.zattrs`)**: +* Simulation statistics (e.g., `Temperature_mean`, `Velocity_magnitude_max`) +* Technical details (`chunk_size`, `compression`) diff --git a/examples/tutorials/etl_cgns_to_zarr/__init__.py b/examples/tutorials/etl_cgns_to_zarr/__init__.py new file mode 100644 index 0000000..aa5ad60 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/__init__.py @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tutorial ETL pipeline for HDF5 to Zarr conversion.""" + +from . import ( + h5_data_source, # noqa: F401 + h5_to_zarr_transformation, # noqa: F401 + tutorial_config, # noqa: F401 + tutorial_validator, # noqa: F401 + zarr_data_source, # noqa: F401 +) diff --git a/examples/tutorials/etl_cgns_to_zarr/cgns_data_source.py b/examples/tutorials/etl_cgns_to_zarr/cgns_data_source.py new file mode 100644 index 0000000..1da3925 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/cgns_data_source.py @@ -0,0 +1,158 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np +import pyvista as pv + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class CGNSDataSource(DataSource): + """DataSource for reading CGNS physics simulation files.""" + + def __init__(self, cfg: ProcessingConfig, input_dir: str): + """Initialize the CGNS data source. + + Args: + cfg: Processing configuration + input_dir: Directory containing input CGNS files + """ + super().__init__(cfg) + self.input_dir = Path(input_dir) + + if not self.input_dir.exists(): + raise FileNotFoundError(f"Input directory {self.input_dir} does not exist") + + def get_file_list(self) -> List[str]: + """Get list of CGNS files to process. + + Returns: + List of filenames (without extension) to process + """ + # Find all .cgns files and return their base names + cgns_files = list(self.input_dir.glob("*.cgns")) + filenames = [f.stem for f in cgns_files] # Remove .cgns extension + + self.logger.info(f"Found {len(filenames)} CGNS files to process") + return sorted(filenames) + + def _read_cgns_mesh(self, filepath: Path): + """Read a CGNS file and extract the mesh. + + Args: + filepath: Path to the CGNS file + + Returns: + pyvista mesh object or None if reading fails + """ + try: + reader = pv.CGNSReader(str(filepath)) + # Turn off loading the interior mesh + reader.load_boundary_patch = False + mesh = reader.read() + + # Check if the mesh is valid and contains a block to process + if not mesh or not mesh[0]: + self.logger.warning(f"No valid data found in {filepath}. Skipping.") + return None + + original = mesh[0][0] + return original + + except Exception as e: + self.logger.error(f"Error processing file {filepath}: {e}") + return None + + def read_file(self, filename: str) -> Dict[str, Any]: + """Read one CGNS file and extract all data. + + Args: + filename: Base filename (without extension) + + Returns: + Dictionary containing extracted data and metadata + """ + filepath = self.input_dir / f"{filename}.cgns" + if not filepath.exists(): + raise FileNotFoundError(f"File not found: {filepath}") + + self.logger.warning(f"Reading {filepath}") + + # Read the CGNS mesh + original_mesh = self._read_cgns_mesh(filepath) + if original_mesh is None: + raise ValueError(f"Failed to read CGNS file: {filepath}") + + # Extract surface and triangulate + surface_mesh = original_mesh.extract_surface().triangulate() + + # Convert cell data to point data if present + if surface_mesh.cell_data: + self.logger.info("Found cell data. Converting to point data.") + surface_mesh = surface_mesh.cell_data_to_point_data() + + # Build data dictionary + data = {} + + # Extract coordinates (points) + data["coordinates"] = np.array(surface_mesh.points) + + # Extract connectivity/faces information + data["faces"] = np.array(surface_mesh.faces).reshape(-1, 4)[:, 1:] # Remove size prefix + + # Extract all point data fields + for field_name in surface_mesh.point_data.keys(): + data[field_name] = np.array(surface_mesh.point_data[field_name]) + self.logger.info(f"Extracted point data field: {field_name}") + + # Store metadata + metadata = { + "n_points": surface_mesh.n_points, + "n_cells": surface_mesh.n_cells, + "bounds": surface_mesh.bounds, + } + data["metadata"] = metadata + data["filename"] = filename + + self.logger.warning(f"Loaded data with {surface_mesh.n_points} points and {surface_mesh.n_cells} cells") + return data + + def _get_output_path(self, filename: str) -> Path: + """Get the final output path for a given filename. + + Args: + filename: Name of the file to process + + Returns: + Path object representing the final output location + """ + raise NotImplementedError("CGNSDataSource only supports reading") + + def _write_impl_temp_file( + self, + data: Dict[str, Any], + output_path: Path, + ) -> None: + """Not implemented - this DataSource only reads.""" + raise NotImplementedError("CGNSDataSource only supports reading") + + def should_skip(self, filename: str) -> bool: + """Never skip files for reading.""" + return False diff --git a/examples/tutorials/etl_cgns_to_zarr/cgns_to_zarr_transformation.py b/examples/tutorials/etl_cgns_to_zarr/cgns_to_zarr_transformation.py new file mode 100644 index 0000000..0b2805c --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/cgns_to_zarr_transformation.py @@ -0,0 +1,151 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import numpy as np +import zarr + +from physicsnemo_curator.etl.data_transformations import DataTransformation +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class CGNSToZarrTransformation(DataTransformation): + """Transform CGNS data into Zarr-optimized format.""" + + def __init__( + self, cfg: ProcessingConfig, chunk_size: int = 500, compression_level: int = 3 + ): + """Initialize the transformation. + + Args: + cfg: Processing configuration + chunk_size: Chunk size for Zarr arrays (number of points per chunk) + compression_level: Compression level (1-9, higher = more compression) + """ + super().__init__(cfg) + self.chunk_size = chunk_size + self.compression_level = compression_level + + # Set up Zarr 3 compression codec + self.compressor = zarr.codecs.BloscCodec( + cname="zstd", + clevel=self.compression_level, + shuffle=zarr.codecs.BloscShuffle.shuffle, + ) + + def transform(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform CGNS data to Zarr-optimized format. + + Args: + data: Dictionary from CGNSDataSource.read_file() + + Returns: + Dictionary with Zarr-optimized arrays and metadata + """ + self.logger.info(f"Transforming {data['filename']} for Zarr storage") + + # Get the number of points to determine chunking + num_points = len(data["coordinates"]) + + # Calculate optimal chunks (don't exceed chunk_size) + chunk_points = min(self.chunk_size, num_points) + + # Prepare arrays that will be written to Zarr stores + zarr_data = { + "coordinates": {}, + "faces": {}, + } + + # Coordinates (2D array: points x 3 dimensions) + zarr_data["coordinates"] = { + "data": data["coordinates"].astype(np.float32), + "chunks": (chunk_points, 3), + "compressor": self.compressor, + "dtype": np.float32, + } + + # Faces/connectivity (2D array: cells x 3 for triangles) + if "faces" in data: + num_cells = len(data["faces"]) + chunk_cells = min(self.chunk_size, num_cells) + zarr_data["faces"] = { + "data": data["faces"].astype(np.int32), + "chunks": (chunk_cells, 3), + "compressor": self.compressor, + "dtype": np.int32, + } + + # Process all point data fields (e.g., pressure, velocity, temperature, etc.) + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue # Skip already processed fields + + if isinstance(field_data, np.ndarray): + self.logger.info(f"Processing field: {field_name} with shape {field_data.shape}") + + # Handle scalar fields (1D) + if field_data.ndim == 1: + zarr_data[field_name] = { + "data": field_data.astype(np.float32), + "chunks": (chunk_points,), + "compressor": self.compressor, + "dtype": np.float32, + } + # Handle vector fields (2D) + elif field_data.ndim == 2: + zarr_data[field_name] = { + "data": field_data.astype(np.float32), + "chunks": (chunk_points, field_data.shape[1]), + "compressor": self.compressor, + "dtype": np.float32, + } + + # Add computed metadata useful for Zarr + metadata = data.get("metadata", {}) + metadata["num_points"] = num_points + metadata["chunk_size"] = chunk_points + metadata["compression"] = "zstd" + metadata["compression_level"] = self.compression_level + + # Add statistics for each field + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue + + if isinstance(field_data, np.ndarray) and field_data.ndim == 1: + # Add statistics for scalar fields + metadata[f"{field_name}_min"] = float(np.min(field_data)) + metadata[f"{field_name}_max"] = float(np.max(field_data)) + metadata[f"{field_name}_mean"] = float(np.mean(field_data)) + elif isinstance(field_data, np.ndarray) and field_data.ndim == 2: + # For vector fields, compute magnitude statistics + magnitude = np.linalg.norm(field_data, axis=1) + metadata[f"{field_name}_magnitude_max"] = float(np.max(magnitude)) + metadata[f"{field_name}_magnitude_mean"] = float(np.mean(magnitude)) + + # Store magnitude as a separate field + zarr_data[f"{field_name}_magnitude"] = { + "data": magnitude.astype(np.float32), + "chunks": (chunk_points,), + "compressor": self.compressor, + "dtype": np.float32, + } + + zarr_data["metadata"] = metadata + + return zarr_data + diff --git a/examples/tutorials/etl_cgns_to_zarr/generate_sample_data.py b/examples/tutorials/etl_cgns_to_zarr/generate_sample_data.py new file mode 100644 index 0000000..3293bd8 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/generate_sample_data.py @@ -0,0 +1,120 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import numpy as np +import pyvista as pv + + +def generate_mesh_with_fields(run_number, resolution=20): + """Generate a mesh with simulation fields for one run. + + Args: + run_number: Run identifier to add variation + resolution: Mesh resolution (higher = more points) + + Returns: + pyvista mesh with point data fields + """ + # Create a sphere mesh with some variation based on run number + radius = 1.0 + 0.1 * np.sin(run_number) + mesh = pv.Sphere(radius=radius, theta_resolution=resolution, phi_resolution=resolution) + + # Get points for field generation + points = mesh.points + num_points = mesh.n_points + + # Generate temperature field based on distance from origin + distances = np.linalg.norm(points, axis=1) + temperature = 250.0 + 100.0 * (distances / distances.max()) + np.random.normal(0, 5, num_points) + mesh.point_data["Temperature"] = temperature.astype(np.float32) + + # Generate pressure field with some variation + pressure = 101325.0 + 1000.0 * np.sin(points[:, 2] * np.pi) + np.random.normal(0, 100, num_points) + mesh.point_data["Pressure"] = pressure.astype(np.float32) + + # Generate velocity field (3D vectors) + # Create a rotational flow pattern + velocity = np.zeros((num_points, 3), dtype=np.float32) + velocity[:, 0] = -points[:, 1] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 1] = points[:, 0] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 2] = points[:, 2] * 0.5 + np.random.normal(0, 0.3, num_points) + mesh.point_data["Velocity"] = velocity + + # Generate density field + density = 1.2 + 0.2 * np.sin(distances * np.pi * 2) + np.random.normal(0, 0.05, num_points) + mesh.point_data["Density"] = density.astype(np.float32) + + # Generate vorticity magnitude (scalar derived from position) + vorticity = 2.0 * np.ones(num_points) + 0.5 * np.cos(distances * 3) + np.random.normal(0, 0.1, num_points) + mesh.point_data["Vorticity"] = vorticity.astype(np.float32) + + return mesh + + +def create_cgns_file(run_number, output_dir="tutorial_data", resolution=20): + """Create one CGNS file for a simulation run. + + Args: + run_number: Run identifier + output_dir: Directory to save the file + resolution: Mesh resolution + """ + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Generate mesh with fields + mesh = generate_mesh_with_fields(run_number, resolution) + num_points = mesh.n_points + num_cells = mesh.n_cells + + # Create CGNS filename + filename = f"run_{run_number:03d}.cgns" + filepath = os.path.join(output_dir, filename) + + # Save as CGNS file + # Note: PyVista writes CGNS files through VTK's CGNS writer + mesh.save(filepath) + + # Print summary of generated data + print(f"Created {filepath}:") + print(f" - {num_points} points") + print(f" - {num_cells} cells") + print(f" - Fields: {', '.join(mesh.point_data.keys())}") + + +def main(): + """Generate sample dataset with 5 simulation runs.""" + print("Generating sample CGNS physics simulation dataset...") + print() + + # Generate 5 runs with varying resolution for diversity + resolutions = [15, 20, 18, 22, 20] + + for run_num in range(1, 6): + create_cgns_file(run_num, resolution=resolutions[run_num - 1]) + + print("\nDataset generation complete!") + print("Created 5 CGNS files in the 'tutorial_data/' directory") + print("Each file contains a sphere mesh with temperature, pressure, velocity, density, and vorticity fields") + print("\nYou can now run the ETL pipeline with:") + print(" python run_etl.py etl.source.input_dir=tutorial_data") + + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_cgns_to_zarr/run_etl.py b/examples/tutorials/etl_cgns_to_zarr/run_etl.py new file mode 100644 index 0000000..0e632ce --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/run_etl.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CGNS to Zarr Tutorial ETL pipeline runner. + +This script demonstrates how to instantiate components and use the ETLOrchestrator +to convert CGNS data to Zarr format. + +Usage: + python run_etl.py \\ + etl.source.input_dir=/path/to/cgns_data \\ + etl.sink.output_dir=/path/to/zarr_output +""" + +import os +import sys + +# Add current directory to sys.path so Hydra can import tutorial modules +sys.path.insert(0, os.path.dirname(__file__)) + +import hydra +from hydra.utils import instantiate +from omegaconf import DictConfig + +from physicsnemo_curator.etl.etl_orchestrator import ETLOrchestrator +from physicsnemo_curator.etl.processing_config import ProcessingConfig +from physicsnemo_curator.utils import utils as curator_utils + + +@hydra.main(version_base="1.3", config_path=".", config_name="tutorial_config") +def main(cfg: DictConfig) -> None: + """Run the CGNS to Zarr tutorial ETL pipeline. + + This function: + 1. Sets up multiprocessing context + 2. Creates the processing config + 3. Instantiates all components (source, sink, transformations, validator) + 4. Passes them to the orchestrator + 5. Runs the pipeline + """ + # Set multiprocessing start method + curator_utils.setup_multiprocessing() + + # Create processing config with common settings + processing_config = ProcessingConfig(**cfg.etl.processing) + + # Create and run validator (if configured) + validator = None + if "validator" in cfg.etl: + validator = instantiate( + cfg.etl.validator, + processing_config, + **{k: v for k, v in cfg.etl.source.items() if not k.startswith("_")}, + ) + + # Instantiate source + source = instantiate(cfg.etl.source, processing_config) + + # Instantiate sink + sink = instantiate(cfg.etl.sink, processing_config) + + # Instantiate transformations + # Need to pass processing_config to each transformation, see: + # https://hydra.cc/docs/advanced/instantiate_objects/overview/#recursive-instantiation + cfgs = {k: {"_args_": [processing_config]} for k in cfg.etl.transformations.keys()} + transformations = instantiate(cfg.etl.transformations, **cfgs) + + # Create and run orchestrator with instantiated components + orchestrator = ETLOrchestrator( + source=source, + sink=sink, + transformations=transformations, + processing_config=processing_config, + validator=validator, + ) + orchestrator.run() + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_cgns_to_zarr/tutorial_config.yaml b/examples/tutorials/etl_cgns_to_zarr/tutorial_config.yaml new file mode 100644 index 0000000..f305385 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/tutorial_config.yaml @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tutorial ETL Pipeline Configuration +# This demonstrates the complete CGNS -> Zarr processing pipeline + +etl: + # Processing settings + processing: + num_processes: 2 # Use 2 processes for this small tutorial dataset + args: {} + + # Validation (runs first) + validator: + _target_: tutorial_validator.TutorialValidator + _convert_: all + input_dir: ??? # Will be provided via command line + validation_level: "fields" # Full validation including data content + + # Source (reads CGNS files) + source: + _target_: cgns_data_source.CGNSDataSource + _convert_: all + input_dir: ??? # Will be provided via command line + + # Transformations (convert to Zarr format) + transformations: + cgns_to_zarr: + _target_: cgns_to_zarr_transformation.CGNSToZarrTransformation + _convert_: all + chunk_size: 500 + compression_level: 3 + + # Sink (writes Zarr stores) + sink: + _target_: zarr_data_source.ZarrDataSource + _convert_: all + output_dir: ??? # Will be provided via command line diff --git a/examples/tutorials/etl_cgns_to_zarr/tutorial_validator.py b/examples/tutorials/etl_cgns_to_zarr/tutorial_validator.py new file mode 100644 index 0000000..93478b1 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/tutorial_validator.py @@ -0,0 +1,263 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import List + +import numpy as np +import pyvista as pv + +from physicsnemo_curator.etl.dataset_validators import ( + DatasetValidator, + ValidationError, + ValidationLevel, +) +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class TutorialValidator(DatasetValidator): + """Validator for CGNS physics simulation dataset.""" + + def __init__( + self, cfg: ProcessingConfig, input_dir: str, validation_level: str = "fields" + ): + """Initialize the validator. + + Args: + cfg: Processing configuration + input_dir: Directory containing CGNS files to validate + validation_level: "structure" or "fields" + """ + super().__init__(cfg) + self.input_dir = Path(input_dir) + self.validation_level = ValidationLevel(validation_level) + + # Define minimum requirements for CGNS mesh + self.min_points = 3 # At least 3 points to form a triangle + self.min_cells = 1 # At least 1 cell + self.expected_spatial_dims = 3 # 3D coordinates (x, y, z) + + def validate(self) -> List[ValidationError]: + """Validate the entire dataset. + + Returns: + List of validation errors (empty if validation passes) + """ + errors = [] + + # Check if input directory exists + if not self.input_dir.exists(): + errors.append( + ValidationError( + path=self.input_dir, + message=f"Input directory does not exist: {self.input_dir}", + level=self.validation_level, + ) + ) + return errors + + # Find all CGNS files + cgns_files = list(self.input_dir.glob("*.cgns")) + + if not cgns_files: + errors.append( + ValidationError( + path=self.input_dir, + message="No CGNS files found in input directory", + level=self.validation_level, + ) + ) + return errors + + # Validate each file + for cgns_file in cgns_files: + file_errors = self.validate_single_item(cgns_file) + errors.extend(file_errors) + + return errors + + def validate_single_item(self, item: Path) -> List[ValidationError]: + """Validate a single CGNS file. + + Args: + item: Path to CGNS file to validate + + Returns: + List of validation errors for this file + """ + errors = [] + + try: + # Try to read the CGNS file + reader = pv.CGNSReader(str(item)) + reader.load_boundary_patch = False + mesh = reader.read() + + # Check if mesh is valid + if not mesh or not mesh[0]: + errors.append( + ValidationError( + path=item, + message="CGNS file contains no valid mesh data", + level=self.validation_level, + ) + ) + return errors + + original_mesh = mesh[0][0] + + # Structure validation + errors.extend(self._validate_structure(original_mesh, item)) + + # Field validation (if requested and structure is valid) + if self.validation_level == ValidationLevel.FIELDS and not errors: + errors.extend(self._validate_fields(original_mesh, item)) + + except Exception as e: + errors.append( + ValidationError( + path=item, + message=f"Failed to open CGNS file: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_structure( + self, mesh: pv.DataSet, file_path: Path + ) -> List[ValidationError]: + """Validate CGNS mesh structure.""" + errors = [] + + # Check mesh has points + if mesh.n_points < self.min_points: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few points: {mesh.n_points} (minimum: {self.min_points})", + level=self.validation_level, + ) + ) + + # Check mesh has cells + if mesh.n_cells < self.min_cells: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few cells: {mesh.n_cells} (minimum: {self.min_cells})", + level=self.validation_level, + ) + ) + + # Check coordinate dimensions + if mesh.points is not None: + if mesh.points.shape[1] != self.expected_spatial_dims: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh points have wrong dimensions: expected {self.expected_spatial_dims}D, got {mesh.points.shape[1]}D", + level=self.validation_level, + ) + ) + + # Check if mesh can be extracted and triangulated (needed for surface processing) + try: + surface_mesh = mesh.extract_surface().triangulate() + if surface_mesh.n_points == 0: + errors.append( + ValidationError( + path=file_path, + message="Extracted surface mesh has no points", + level=self.validation_level, + ) + ) + except Exception as e: + errors.append( + ValidationError( + path=file_path, + message=f"Failed to extract and triangulate surface: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_fields(self, mesh: pv.DataSet, file_path: Path) -> List[ValidationError]: + """Validate field data content.""" + errors = [] + + # Extract surface and triangulate for field validation + try: + surface_mesh = mesh.extract_surface().triangulate() + + # Convert cell data to point data if present + if surface_mesh.cell_data: + surface_mesh = surface_mesh.cell_data_to_point_data() + + except Exception as e: + errors.append( + ValidationError( + path=file_path, + message=f"Failed to process mesh for field validation: {str(e)}", + level=self.validation_level, + ) + ) + return errors + + # Check that mesh has at least some point data fields + if not surface_mesh.point_data: + errors.append( + ValidationError( + path=file_path, + message="Mesh has no point data fields", + level=self.validation_level, + ) + ) + return errors + + # Validate each field + for field_name, field_data in surface_mesh.point_data.items(): + # Check field size matches number of points + if len(field_data) != surface_mesh.n_points: + errors.append( + ValidationError( + path=file_path, + message=f"Field '{field_name}' size ({len(field_data)}) does not match number of points ({surface_mesh.n_points})", + level=self.validation_level, + ) + ) + + # Check for NaN or infinite values + if np.isnan(field_data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{field_name}' contains NaN values", + level=self.validation_level, + ) + ) + + if np.isinf(field_data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{field_name}' contains infinite values", + level=self.validation_level, + ) + ) + + return errors diff --git a/examples/tutorials/etl_cgns_to_zarr/zarr_data_source.py b/examples/tutorials/etl_cgns_to_zarr/zarr_data_source.py new file mode 100644 index 0000000..176f7f0 --- /dev/null +++ b/examples/tutorials/etl_cgns_to_zarr/zarr_data_source.py @@ -0,0 +1,118 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Dict, List + +import zarr +from zarr.storage import LocalStore + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class ZarrDataSource(DataSource): + """DataSource for writing to Zarr stores.""" + + def __init__(self, cfg: ProcessingConfig, output_dir: str): + """Initialize the Zarr data source. + + Args: + cfg: Processing configuration + output_dir: Directory to write Zarr stores + """ + super().__init__(cfg) + self.output_dir = Path(output_dir) + + # Create output directory if it doesn't exist + self.output_dir.mkdir(parents=True, exist_ok=True) + + def get_file_list(self) -> List[str]: + """Not implemented - this DataSource only writes.""" + raise NotImplementedError("ZarrDataSource only supports writing") + + def read_file(self, filename: str) -> Dict[str, Any]: + """Not implemented - this DataSource only writes.""" + raise NotImplementedError("ZarrDataSource only supports writing") + + def _get_output_path(self, filename: str) -> Path: + """Get the output path for a given filename. + + Args: + filename: Name of the file to process + + Returns: + Path object representing the output location. + """ + return self.output_dir / f"{filename}.zarr" + + def _write_impl_temp_file(self, data: Dict[str, Any], output_path: Path) -> None: + """ + Implement actual data writing logic to a temporary Zarr store. + ... + """ + # Create Zarr store + self.logger.info(f"Creating Zarr store: {output_path}") + store = LocalStore(output_path) + root = zarr.open_group(store=store, mode="w") + + # Store metadata as root attributes + if "metadata" in data: + for key, value in data["metadata"].items(): + if hasattr(value, "item"): # numpy scalar + value = value.item() + root.attrs[key] = value + data.pop("metadata") + + # Write all arrays from the transformation + for array_name, array_info in data.items(): + root.create_array( + name=array_name, + data=array_info["data"], + chunks=array_info["chunks"], + compressors=( + array_info["compressor"] if array_info["compressor"] else None + ), + # REMOVED: dtype=array_info["dtype"], + ) + + # Add some store-level metadata + root.attrs["zarr_format"] = 3 + root.attrs["created_by"] = "physicsnemo-curator-tutorial" + + # Something weird is happening here. + # If this error occurs, the stores are created and we move to the next one. + # If this error does NOT occur, we seem to skip all the remaining files. + # Debug with Alexey. + self.logger.info("Successfully created Zarr store") + + def should_skip(self, filename: str) -> bool: + """Check if we should skip writing this store. + + Args: + filename: Base filename to check + + Returns: + True if store should be skipped (already exists) + """ + store_path = self.output_dir / f"{filename}.zarr" + exists = store_path.exists() + + if exists: + self.logger.info(f"Skipping {filename} - Zarr store already exists") + return True + + return False diff --git a/examples/tutorials/etl_ensight_to_numpy/README.md b/examples/tutorials/etl_ensight_to_numpy/README.md new file mode 100644 index 0000000..423f574 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/README.md @@ -0,0 +1,144 @@ +# PhysicsNeMo-Curator Tutorial: EnSight Gold to NumPy + +## Overview + +This tutorial demonstrates how to use the PhysicsNeMo-Curator ETL pipeline to: + +1. Extract physics simulation data from **EnSight Gold** (`.case`) files. +2. Transform the data into standard **NumPy arrays** with configurable precision. +3. Write the processed data to efficient, compressed `.npz` files with sidecar metadata. + +## 1. Create a Dataset + +PhysicsNeMo-Curator works with well-defined formats and schemas. For this tutorial, we define a custom simulation dataset using: + +* **Format**: EnSight Gold (a common CFD post-processing format). +* **Storage**: Local filesystem (consisting of a `.case` file and associated geometry/variable files). +* **Schema**: Each simulation run contains a mesh with the following fields: + +| Field Name | Type | Description | +| --- | --- | --- | +| `coordinates` | `(N, 3)` | Spatial coordinates (x, y, z) of mesh points | +| `faces` | `(M, 4)` | Mesh connectivity (triangulated surface) | +| `Temperature` | `(N,)` | Scalar temperature field | +| `Pressure` | `(N,)` | Scalar pressure field | +| `Velocity` | `(N, 3)` | 3D velocity vector field | +| `Density` | `(N,)` | Scalar density field | +| `Vorticity` | `(N,)` | Scalar vorticity field | + +### Generate Sample Data + +We have provided a script to generate 5 simulation runs with random physics-like data on a spherical mesh. + +To generate the data: + +```bash +python generate_sample_data.py + +``` + +This will create a `tutorial_data/` directory containing 5 sets of EnSight Gold files (e.g., `run_001.case` and its data files). + +## 2. The ETL Pipeline + +The pipeline consists of four main components orchestrated to process files in parallel. + +### A. Source: `EnSightDataSource` + +* **File**: `ensight_data_source.py` +* **Function**: Reads EnSight Gold files using `vtk` and `pyvista`. +* **MultiBlock Handling**: Automatically flattens complex MultiBlock datasets often found in EnSight exports. +* **Surface Extraction**: Extracts and merges surfaces from multiple parts. +* **Variable Handling**: Handles inconsistent variables across blocks by padding missing fields with zeros during the merge process. + + + +### B. Transformation: `EnSightToNumpyTransformation` + +* **File**: `ensight_to_numpy_transformation.py` +* **Function**: Converts raw EnSight data into standard NumPy arrays. +* **Precision Control**: Configurable to output `float32` (default) or `float64`. +* **Vector Handling**: Automatically computes magnitude arrays for 2D/vector fields (e.g., `Velocity` -> `Velocity_magnitude`). +* **Statistics**: Calculates comprehensive statistics (min, max, mean, std) for all fields. + + + +### C. Sink: `NumpyDataSource` + +* **File**: `numpy_data_source.py` +* **Function**: Writes the transformed data to disk. +* **Data**: Saved as compressed `.npz` files (using `np.savez_compressed`). +* **Metadata**: Saved as separate `.json` sidecar files containing file info and calculated statistics. + + + +### D. Validator: `EnSightTutorialValidator` + +* **File**: `tutorial_validator.py` +* **Function**: Validates the input EnSight files before processing. It checks for: +* **Readability**: Ensures the `.case` file and its dependencies can be opened. +* **Structure**: Verifies minimum point/cell counts and 3D spatial dimensions. +* **Data Integrity**: Checks for the presence of point data and ensures no NaNs or infinite values exist. + + + +## 3. Configuration + +The pipeline is configured using Hydra via `tutorial_config.yaml`. + +```yaml +etl: + processing: + num_processes: 2 # Parallel execution + + validator: + _target_: tutorial_validator.EnSightTutorialValidator + validation_level: "fields" + + source: + _target_: ensight_data_source.EnSightDataSource + + transformations: + ensight_to_numpy: + _target_: ensight_to_numpy_transformation.EnSightToNumpyTransformation + precision: "float32" # or "float64" + + sink: + _target_: numpy_data_source.NumpyDataSource + +``` + +## 4. Run the Pipeline + +To run the ETL pipeline, use the `run_etl.py` script. You must specify the input and output directories. + +```bash +python run_etl.py \ + etl.source.input_dir=tutorial_data \ + etl.sink.output_dir=output_numpy + +``` + +**Note:** If you are processing a large dataset where validation takes too long, you can skip the validation step by adding `~etl.validator` to the command: + +```bash +python run_etl.py etl.source.input_dir=... etl.sink.output_dir=... ~etl.validator + +``` + +## 5. Output + +After execution, the `output_numpy/` directory will contain paired files for each run: + +1. **`run_001.npz`**: A compressed archive containing the NumPy arrays (`coordinates`, `faces`, `Temperature`, `Velocity`, etc.). +2. **`run_001.json`**: A JSON file containing metadata, such as: +```json +{ + "num_points": 1024, + "precision": "", + "Temperature_mean": 300.5, + "Temperature_std": 12.1, + "Velocity_magnitude_max": 15.2 +} + +``` \ No newline at end of file diff --git a/examples/tutorials/etl_ensight_to_numpy/__init__.py b/examples/tutorials/etl_ensight_to_numpy/__init__.py new file mode 100644 index 0000000..aa5ad60 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/__init__.py @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tutorial ETL pipeline for HDF5 to Zarr conversion.""" + +from . import ( + h5_data_source, # noqa: F401 + h5_to_zarr_transformation, # noqa: F401 + tutorial_config, # noqa: F401 + tutorial_validator, # noqa: F401 + zarr_data_source, # noqa: F401 +) diff --git a/examples/tutorials/etl_ensight_to_numpy/ensight_data_source.py b/examples/tutorials/etl_ensight_to_numpy/ensight_data_source.py new file mode 100644 index 0000000..8ff71b5 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/ensight_data_source.py @@ -0,0 +1,154 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np +import pyvista as pv +import vtk + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class EnSightDataSource(DataSource): + """Drop-in replacement for CGNSDataSource to read EnSight Gold .case files.""" + + def __init__(self, cfg: ProcessingConfig, input_dir: str): + super().__init__(cfg) + self.input_dir = Path(input_dir) + if not self.input_dir.exists(): + raise FileNotFoundError(f"Input directory {self.input_dir} does not exist") + + def get_file_list(self) -> List[str]: + """Get list of .case files to process.""" + case_files = list(self.input_dir.glob("*.case")) + filenames = [f.stem for f in case_files] + self.logger.info(f"Found {len(filenames)} EnSight files to process") + return sorted(filenames) + + def _read_ensight(self, filepath: Path) -> pv.MultiBlock: + """Read an EnSight Gold file and return a MultiBlock dataset.""" + try: + reader = vtk.vtkEnSightGoldBinaryReader() + reader.SetCaseFileName(str(filepath)) + reader.ReadAllVariablesOn() + reader.Update() + return pv.wrap(reader.GetOutput()) + except Exception as e: + self.logger.error(f"Error reading EnSight file {filepath}: {e}") + return None + + def _flatten_multiblock(self, dataset: pv.MultiBlock) -> List[pv.DataSet]: + """Recursively flatten MultiBlock into a list of non-empty blocks.""" + blocks = [] + for block in dataset: + if isinstance(block, pv.MultiBlock): + blocks.extend(self._flatten_multiblock(block)) + elif block and block.n_points > 0 and block.n_cells > 0: + blocks.append(block) + return blocks + + def _extract_surface_mesh(self, dataset: pv.MultiBlock) -> pv.PolyData: + """Extract a clean surface mesh handling inconsistent variables across blocks.""" + blocks = self._flatten_multiblock(dataset) + + if not blocks: + raise RuntimeError("No valid blocks found in EnSight dataset") + + # Separate blocks into full vs partial (having >1 point data field) + blocks_full, blocks_partial = [], [] + for block in blocks: + if len(block.point_data.keys()) > 1: + blocks_full.append(block) + else: + blocks_partial.append(block) + + surface_full = pv.merge([b.extract_surface() for b in blocks_full]) if blocks_full else None + surface_partial = pv.merge([b.extract_surface() for b in blocks_partial]) if blocks_partial else None + + if surface_full is None and surface_partial is None: + raise RuntimeError("No valid surfaces could be extracted") + + if surface_full is None: + return surface_partial.triangulate() + if surface_partial is None: + return surface_full.triangulate() + + # Fill missing variables in partial blocks + full_arrays = surface_full.point_data + partial_arrays = surface_partial.point_data + missing = set(full_arrays.keys()) - set(partial_arrays.keys()) + for name in missing: + src = full_arrays[name] + placeholder = np.zeros((surface_partial.n_points, src.shape[1])) if src.ndim > 1 else np.zeros(surface_partial.n_points) + surface_partial.point_data[name] = placeholder + self.logger.debug(f"Added placeholder for missing variable '{name}'") + + # Merge surfaces + final_mesh = surface_full.merge(surface_partial) + return final_mesh.triangulate() + + def _read_mesh(self, filename: str) -> pv.PolyData: + """Read a single EnSight file and return the triangulated surface mesh.""" + filepath = self.input_dir / f"{filename}.case" + if not filepath.exists(): + raise FileNotFoundError(f"File not found: {filepath}") + + self.logger.warning(f"Reading {filepath}") + dataset = self._read_ensight(filepath) + if dataset is None: + raise ValueError(f"Failed to read EnSight file: {filepath}") + + surface_mesh = self._extract_surface_mesh(dataset) + + # Convert cell data to point data if present + if surface_mesh.cell_data: + surface_mesh = surface_mesh.cell_data_to_point_data() + + return surface_mesh + + def read_file(self, filename: str) -> Dict[str, Any]: + """Read one EnSight file and extract all data in the same format as CGNSDataSource.""" + surface_mesh = self._read_mesh(filename) + + data = { + "coordinates": np.array(surface_mesh.points), + "faces": np.array(surface_mesh.faces).reshape(-1, 4)[:, 1:], # triangulated + "metadata": { + "n_points": surface_mesh.n_points, + "n_cells": surface_mesh.n_cells, + "bounds": surface_mesh.bounds, + }, + "filename": filename + } + + for field_name in surface_mesh.point_data.keys(): + data[field_name] = np.array(surface_mesh.point_data[field_name]) + self.logger.info(f"Extracted point data field: {field_name}") + + self.logger.warning(f"Loaded data with {surface_mesh.n_points} points and {surface_mesh.n_cells} cells") + return data + + def _get_output_path(self, filename: str) -> Path: + raise NotImplementedError("EnSightDataSource only supports reading") + + def _write_impl_temp_file(self, data: Dict[str, Any], output_path: Path) -> None: + raise NotImplementedError("EnSightDataSource only supports reading") + + def should_skip(self, filename: str) -> bool: + return False diff --git a/examples/tutorials/etl_ensight_to_numpy/ensight_to_numpy_transformation.py b/examples/tutorials/etl_ensight_to_numpy/ensight_to_numpy_transformation.py new file mode 100644 index 0000000..ebc6f26 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/ensight_to_numpy_transformation.py @@ -0,0 +1,84 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import numpy as np + +from physicsnemo_curator.etl.data_transformations import DataTransformation +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class EnSightToNumpyTransformation(DataTransformation): + """Transform EnSight Gold data into NumPy array format.""" + + def __init__( + self, cfg: ProcessingConfig, precision: str = "float32" + ): + super().__init__(cfg) + self.dtype = np.float32 if precision == "float32" else np.float64 + + def transform(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform EnSight data to NumPy array format.""" + self.logger.info(f"Transforming {data['filename']} to NumPy arrays") + + num_points = len(data["coordinates"]) + numpy_data = {} + + # Coordinates + numpy_data["coordinates"] = data["coordinates"].astype(self.dtype) + + # Faces/connectivity + if "faces" in data: + numpy_data["faces"] = data["faces"].astype(np.int32) + + # Process all point data fields + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue + if isinstance(field_data, np.ndarray): + self.logger.info(f"Processing field: {field_name} with shape {field_data.shape}") + numpy_data[field_name] = field_data.astype(self.dtype) + + # Build metadata + metadata = data.get("metadata", {}) + metadata.update({ + "num_points": num_points, + "precision": str(self.dtype), + "format": "numpy", + }) + + # Add statistics for each field + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue + if isinstance(field_data, np.ndarray): + if field_data.ndim == 1: + metadata[f"{field_name}_min"] = float(np.min(field_data)) + metadata[f"{field_name}_max"] = float(np.max(field_data)) + metadata[f"{field_name}_mean"] = float(np.mean(field_data)) + metadata[f"{field_name}_std"] = float(np.std(field_data)) + elif field_data.ndim == 2: + magnitude = np.linalg.norm(field_data, axis=1) + metadata[f"{field_name}_magnitude_max"] = float(np.max(magnitude)) + metadata[f"{field_name}_magnitude_mean"] = float(np.mean(magnitude)) + metadata[f"{field_name}_magnitude_std"] = float(np.std(magnitude)) + numpy_data[f"{field_name}_magnitude"] = magnitude.astype(self.dtype) + + numpy_data["metadata"] = metadata + return numpy_data + + diff --git a/examples/tutorials/etl_ensight_to_numpy/generate_sample_data.py b/examples/tutorials/etl_ensight_to_numpy/generate_sample_data.py new file mode 100644 index 0000000..1428404 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/generate_sample_data.py @@ -0,0 +1,77 @@ +import os +import numpy as np +import pyvista as pv +from vtk import vtkEnSightWriter + +def generate_mesh_with_fields(run_number, resolution=20): + """Generate a mesh with simulation fields for one run.""" + radius = 1.0 + 0.1 * np.sin(run_number) + mesh = pv.Sphere(radius=radius, theta_resolution=resolution, phi_resolution=resolution) + + points = mesh.points + num_points = mesh.n_points + + distances = np.linalg.norm(points, axis=1) + temperature = 250.0 + 100.0 * (distances / distances.max()) + np.random.normal(0, 5, num_points) + mesh.point_data["Temperature"] = temperature.astype(np.float32) + + pressure = 101325.0 + 1000.0 * np.sin(points[:, 2] * np.pi) + np.random.normal(0, 100, num_points) + mesh.point_data["Pressure"] = pressure.astype(np.float32) + + velocity = np.zeros((num_points, 3), dtype=np.float32) + velocity[:, 0] = -points[:, 1] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 1] = points[:, 0] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 2] = points[:, 2] * 0.5 + np.random.normal(0, 0.3, num_points) + mesh.point_data["Velocity"] = velocity + + density = 1.2 + 0.2 * np.sin(distances * np.pi * 2) + np.random.normal(0, 0.05, num_points) + mesh.point_data["Density"] = density.astype(np.float32) + + vorticity = 2.0 * np.ones(num_points) + 0.5 * np.cos(distances * 3) + np.random.normal(0, 0.1, num_points) + mesh.point_data["Vorticity"] = vorticity.astype(np.float32) + + return mesh + +def create_ensight_case_file(run_number, output_dir="tutorial_data", resolution=20): + """Create one EnSight Gold case file for a simulation run.""" + os.makedirs(output_dir, exist_ok=True) + + mesh = generate_mesh_with_fields(run_number, resolution) + num_points = mesh.n_points + num_cells = mesh.n_cells + + # Create base filename + base_name = f"run_{run_number:03d}" + case_file = os.path.join(output_dir, f"{base_name}.case") + + # PyVista VTK EnSightWriter requires vtk.vtkUnstructuredGrid + # Wrap the mesh as unstructured grid if necessary + ug = pv.wrap(mesh) + + # Write EnSight Gold binary files + writer = vtkEnSightWriter() + writer.SetFileName(case_file) + writer.SetInputData(ug) + writer.SetCaseFileName(case_file) + writer.WriteAllVariablesOn() + writer.Update() + + print(f"Created {case_file}:") + print(f" - {num_points} points") + print(f" - {num_cells} cells") + print(f" - Fields: {', '.join(mesh.point_data.keys())}") + +def main(): + print("Generating sample EnSight Gold physics simulation dataset...\n") + resolutions = [15, 20, 18, 22, 20] + + for run_num in range(1, 6): + create_ensight_case_file(run_num, resolution=resolutions[run_num - 1]) + + print("\nDataset generation complete!") + print("Created 5 EnSight Gold .case files in the 'tutorial_data/' directory") + print("Each file contains a sphere mesh with Temperature, Pressure, Velocity, Density, and Vorticity fields") + print("\nYou can now run the ETL pipeline with your EnSightDataSource") + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_ensight_to_numpy/numpy_data_source.py b/examples/tutorials/etl_ensight_to_numpy/numpy_data_source.py new file mode 100644 index 0000000..8071461 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/numpy_data_source.py @@ -0,0 +1,110 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class NumpyDataSource(DataSource): + """DataSource for writing NumPy arrays to .npz files.""" + + def __init__(self, cfg: ProcessingConfig, output_dir: str): + """Initialize the NumPy data sink. + + Args: + cfg: Processing configuration + output_dir: Directory where .npz files will be written + """ + super().__init__(cfg) + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def get_file_list(self) -> List[str]: + """Not used for sink-only operations.""" + return [] + + def read_file(self, filename: str) -> Dict[str, Any]: + """Not implemented - this DataSource only writes.""" + raise NotImplementedError("NumpyDataSource only supports writing") + + def _get_output_path(self, filename: str) -> Path: + """Get the final output path for a given filename. + + Args: + filename: Name of the file to process + + Returns: + Path object representing the final output location + """ + return self.output_dir / f"{filename}.npz" + + def _write_impl_temp_file( + self, + data: Dict[str, Any], + output_path: Path, + ) -> None: + """Write data to a NumPy .npz file. + + Args: + data: Dictionary containing NumPy arrays and metadata + output_path: Path where the .npz file should be written + """ + self.logger.info(f"Writing NumPy arrays to {output_path}") + + # Separate metadata from array data + metadata = data.pop("metadata", {}) + filename = data.pop("filename", "unknown") + + # Save all arrays to .npz file (compressed) + # FIX: Use an open file object to prevent numpy from appending .npz to the temp filename + with open(output_path, "wb") as f: + np.savez_compressed(f, **data) + + # Save metadata as separate JSON file + # Note: This writes directly to the final .json path (sidecar file) + metadata_path = output_path.with_suffix(".json") + metadata["filename"] = filename + + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + + self.logger.info( + f"Saved {len(data)} arrays to {output_path} " + f"and metadata to {metadata_path}" + ) + + def should_skip(self, filename: str) -> bool: + """Check if output file already exists. + + Args: + filename: Name of the file to check + + Returns: + True if the output file already exists and we should skip processing + """ + output_path = self._get_output_path(filename) + exists = output_path.exists() + + if exists: + self.logger.info(f"Skipping {filename} - output already exists") + + return exists \ No newline at end of file diff --git a/examples/tutorials/etl_ensight_to_numpy/run_etl.py b/examples/tutorials/etl_ensight_to_numpy/run_etl.py new file mode 100644 index 0000000..8a17661 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/run_etl.py @@ -0,0 +1,64 @@ +"""EnSight to NumPy Tutorial ETL pipeline runner. + +Usage: + python run_etl.py \ + etl.source.input_dir=/path/to/ensight_data \ + etl.sink.output_dir=/path/to/numpy_output +""" + +import os +import sys + +# Add current directory to sys.path so Hydra can import tutorial modules +sys.path.insert(0, os.path.dirname(__file__)) + +import hydra +from hydra.utils import instantiate +from omegaconf import DictConfig + +from physicsnemo_curator.etl.etl_orchestrator import ETLOrchestrator +from physicsnemo_curator.etl.processing_config import ProcessingConfig +from physicsnemo_curator.utils import utils as curator_utils + + +@hydra.main(version_base="1.3", config_path=".", config_name="tutorial_config") +def main(cfg: DictConfig) -> None: + """Run the EnSight Gold to NumPy tutorial ETL pipeline.""" + # Setup multiprocessing + curator_utils.setup_multiprocessing() + + # Create processing config + processing_config = ProcessingConfig(**cfg.etl.processing) + + # Create validator if configured + validator = None + if "validator" in cfg.etl: + validator = instantiate( + cfg.etl.validator, + processing_config, + **{k: v for k, v in cfg.etl.source.items() if not k.startswith("_")}, + ) + + # Instantiate source (now EnSightDataSource) + source = instantiate(cfg.etl.source, processing_config) + + # Instantiate sink + sink = instantiate(cfg.etl.sink, processing_config) + + # Instantiate transformations + cfgs = {k: {"_args_": [processing_config]} for k in cfg.etl.transformations.keys()} + transformations = instantiate(cfg.etl.transformations, **cfgs) + + # Create and run orchestrator + orchestrator = ETLOrchestrator( + source=source, + sink=sink, + transformations=transformations, + processing_config=processing_config, + validator=validator, + ) + orchestrator.run() + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_ensight_to_numpy/tutorial_config.yaml b/examples/tutorials/etl_ensight_to_numpy/tutorial_config.yaml new file mode 100644 index 0000000..b0afe11 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/tutorial_config.yaml @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tutorial ETL Pipeline Configuration +# This demonstrates the complete CGNS -> NumPy processing pipeline + +etl: + # Processing settings + processing: + num_processes: 2 # Use 2 processes for small tutorial dataset + args: {} + + # Validation (runs first) + validator: + _target_: tutorial_validator.EnSightTutorialValidator + _convert_: all + input_dir: ??? # Provided via command line + validation_level: "fields" # Full validation including data content + + # Source (reads EnSight files) + source: + _target_: ensight_data_source.EnSightDataSource + _convert_: all + input_dir: ??? # Provided via command line + + # Transformations (convert to NumPy format) + transformations: + ensight_to_numpy: + _target_: ensight_to_numpy_transformation.EnSightToNumpyTransformation + _convert_: all + precision: "float32" # or "float64" for double precision + + # Sink (writes NumPy .npz files) + sink: + _target_: numpy_data_source.NumpyDataSource + _convert_: all + output_dir: ??? # Provided via command line + diff --git a/examples/tutorials/etl_ensight_to_numpy/tutorial_validator.py b/examples/tutorials/etl_ensight_to_numpy/tutorial_validator.py new file mode 100644 index 0000000..88880af --- /dev/null +++ b/examples/tutorials/etl_ensight_to_numpy/tutorial_validator.py @@ -0,0 +1,245 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import List + +import numpy as np +import pyvista as pv + +from physicsnemo_curator.etl.dataset_validators import ( + DatasetValidator, + ValidationError, + ValidationLevel, +) +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class EnSightTutorialValidator(DatasetValidator): + """Validator for EnSight Gold physics simulation dataset.""" + + def __init__( + self, cfg: ProcessingConfig, input_dir: str, validation_level: str = "fields" + ): + """Initialize the validator. + + Args: + cfg: Processing configuration + input_dir: Directory containing EnSight files to validate + validation_level: "structure" or "fields" + """ + super().__init__(cfg) + self.input_dir = Path(input_dir) + self.validation_level = ValidationLevel(validation_level) + + # Define minimum requirements + self.min_points = 3 + self.min_cells = 1 + self.expected_spatial_dims = 3 + + def validate(self) -> List[ValidationError]: + """Validate all EnSight case files in the input directory.""" + errors = [] + + if not self.input_dir.exists(): + errors.append( + ValidationError( + path=self.input_dir, + message=f"Input directory does not exist: {self.input_dir}", + level=self.validation_level, + ) + ) + return errors + + case_files = list(self.input_dir.glob("*.case")) + if not case_files: + errors.append( + ValidationError( + path=self.input_dir, + message="No EnSight .case files found in input directory", + level=self.validation_level, + ) + ) + return errors + + for case_file in case_files: + file_errors = self.validate_single_item(case_file) + errors.extend(file_errors) + + return errors + + def validate_single_item(self, item: Path) -> List[ValidationError]: + """Validate a single EnSight .case file.""" + errors = [] + + try: + # Read EnSight file + reader = pv.get_reader(str(item)) + + # FIX: Use correct methods to enable all arrays instead of 'read_all_variables' + if hasattr(reader, "enable_all_point_arrays"): + reader.enable_all_point_arrays() + if hasattr(reader, "enable_all_cell_arrays"): + reader.enable_all_cell_arrays() + + dataset = reader.read() + + if not dataset or len(dataset) == 0: + errors.append( + ValidationError( + path=item, + message="EnSight file contains no valid blocks", + level=self.validation_level, + ) + ) + return errors + + # Merge blocks and extract surface for validation + full_mesh = None + for block in dataset: + if block and block.n_points > 0: + mesh = block.extract_surface().triangulate() + if full_mesh is None: + full_mesh = mesh + else: + full_mesh = full_mesh.merge(mesh) + + if full_mesh is None or full_mesh.n_points < self.min_points: + errors.append( + ValidationError( + path=item, + message="Merged surface mesh has too few points", + level=self.validation_level, + ) + ) + return errors + + # Structure validation + errors.extend(self._validate_structure(full_mesh, item)) + + # Field validation + if self.validation_level == ValidationLevel.FIELDS: + errors.extend(self._validate_fields(full_mesh, item)) + + except Exception as e: + errors.append( + ValidationError( + path=item, + message=f"Failed to open or process EnSight file: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_structure( + self, mesh: pv.PolyData, file_path: Path + ) -> List[ValidationError]: + """Validate mesh structure.""" + errors = [] + + if mesh.n_points < self.min_points: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few points: {mesh.n_points}", + level=self.validation_level, + ) + ) + + if mesh.n_cells < self.min_cells: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few cells: {mesh.n_cells}", + level=self.validation_level, + ) + ) + + if mesh.points.shape[1] != self.expected_spatial_dims: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh points have wrong dimensions: expected {self.expected_spatial_dims}D", + level=self.validation_level, + ) + ) + + try: + surf = mesh.extract_surface().triangulate() + if surf.n_points == 0: + errors.append( + ValidationError( + path=file_path, + message="Extracted surface mesh has no points", + level=self.validation_level, + ) + ) + except Exception as e: + errors.append( + ValidationError( + path=file_path, + message=f"Failed to extract surface mesh: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_fields(self, mesh: pv.PolyData, file_path: Path) -> List[ValidationError]: + """Validate point data fields.""" + errors = [] + + if mesh.cell_data: + mesh = mesh.cell_data_to_point_data() + + if not mesh.point_data: + errors.append( + ValidationError( + path=file_path, + message="Mesh has no point data fields", + level=self.validation_level, + ) + ) + return errors + + for name, data in mesh.point_data.items(): + if len(data) != mesh.n_points: + errors.append( + ValidationError( + path=file_path, + message=f"Field '{name}' length does not match number of points", + level=self.validation_level, + ) + ) + if np.isnan(data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{name}' contains NaN values", + level=self.validation_level, + ) + ) + if np.isinf(data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{name}' contains infinite values", + level=self.validation_level, + ) + ) + + return errors \ No newline at end of file diff --git a/examples/tutorials/etl_ensight_to_zarr/README.md b/examples/tutorials/etl_ensight_to_zarr/README.md new file mode 100644 index 0000000..92e8ff3 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/README.md @@ -0,0 +1,135 @@ +# PhysicsNeMo-Curator Tutorial: EnSight Gold to Zarr + +## Overview + +This tutorial demonstrates how to use the PhysicsNeMo-Curator ETL pipeline to: + +1. Extract physics simulation data from **EnSight Gold** (`.case`) files. +2. Transform the data into an optimized, AI-model-ready format (Zarr). +3. Write the transformed data to disk efficiently. + +## 1. Create a Dataset + +PhysicsNeMo-Curator works with well-defined formats and schemas. For this tutorial, we define a custom simulation dataset using: + +* **Format**: EnSight Gold (a common CFD post-processing format). +* **Storage**: Local filesystem (consisting of a `.case` file and associated geometry/variable files). +* **Schema**: Each simulation run contains a mesh with the following fields: + +| Field Name | Type | Description | +| --- | --- | --- | +| `coordinates` | `(N, 3)` | Spatial coordinates (x, y, z) of mesh points | +| `faces` | `(M, 4)` | Mesh connectivity (triangulated surface) | +| `Temperature` | `(N,)` | Scalar temperature field | +| `Pressure` | `(N,)` | Scalar pressure field | +| `Velocity` | `(N, 3)` | 3D velocity vector field | +| `Density` | `(N,)` | Scalar density field | +| `Vorticity` | `(N,)` | Scalar vorticity field | + +### Generate Sample Data + +We have provided a script to generate 5 simulation runs with random physics-like data on a spherical mesh. + +To generate the data: + +```bash +python generate_sample_data.py + +``` + +This will create a `tutorial_data/` directory containing 5 sets of EnSight Gold files (e.g., `run_001.case` and its data files). + +## 2. The ETL Pipeline + +The pipeline consists of four main components orchestrated to process files in parallel. + +### A. Source: `EnSightDataSource` + +* **File**: `ensight_data_source.py` +* **Function**: Reads EnSight Gold files using `vtk` and `pyvista`. +* **MultiBlock Handling**: Automatically flattens complex MultiBlock datasets often found in EnSight exports. +* **Surface Extraction**: Extracts and merges surfaces from multiple parts. +* **Variable Handling**: Handles inconsistent variables across blocks by padding missing fields with zeros during the merge process. + + + +### B. Transformation: `EnSightToZarrTransformation` + +* **File**: `ensight_to_zarr_transformation.py` +* **Function**: Converts the raw mesh data into a Zarr-optimized format. +* **Chunking & Compression**: Applies chunks and Zstd compression for efficient storage. +* **Scalar Statistics**: Computes min, max, and mean for scalar fields (e.g., `Temperature`). +* **Vector Statistics**: Computes magnitudes and statistics for vector fields (e.g., `Velocity` -> `Velocity_magnitude`). + + + +### C. Sink: `ZarrDataSource` + +* **File**: `zarr_data_source.py` +* **Function**: Writes the transformed data into individual Zarr stores (directories). It preserves the metadata and structure defined by the transformation. + +### D. Validator: `EnSightTutorialValidator` + +* **File**: `tutorial_validator.py` +* **Function**: Validates the input EnSight files before processing. It checks for: +* **Readability**: Ensures the `.case` file and its dependencies can be opened. +* **Structure**: Verifies minimum point/cell counts and 3D spatial dimensions. +* **Data Integrity**: Checks for the presence of point data and ensures no NaNs or infinite values exist. + + + +## 3. Configuration + +The pipeline is configured using Hydra via `tutorial_config.yaml`. + +```yaml +etl: + processing: + num_processes: 2 # Parallel execution + + validator: + _target_: tutorial_validator.EnSightTutorialValidator + validation_level: "fields" + + source: + _target_: ensight_data_source.EnSightDataSource + + transformations: + ensight_to_zarr: + _target_: ensight_to_zarr_transformation.EnSightToZarrTransformation + chunk_size: 500 + compression_level: 3 + + sink: + _target_: zarr_data_source.ZarrDataSource + +``` + +## 4. Run the Pipeline + +To run the ETL pipeline, use the `run_etl.py` script. You must specify the input and output directories. + +```bash +python run_etl.py \ + etl.source.input_dir=tutorial_data \ + etl.sink.output_dir=output_zarr + +``` + +**Note:** If you are processing a very large dataset where validation takes too long, you can skip the validation step by adding `~etl.validator` to the command. + +## 5. Output + +After execution, the `output_zarr/` directory will contain a separate Zarr store for each run (e.g., `run_001.zarr`). Each store will contain: + +* **Arrays**: +* `coordinates` +* `faces` +* `Temperature`, `Pressure`, `Density`, `Vorticity` +* `Velocity` +* `Velocity_magnitude` (derived) + + +* **Metadata (`.zattrs`)**: +* Simulation statistics (e.g., `Temperature_max`, `Velocity_magnitude_mean`) +* Technical details (`num_points`, `chunk_size`, `compression`) \ No newline at end of file diff --git a/examples/tutorials/etl_ensight_to_zarr/__init__.py b/examples/tutorials/etl_ensight_to_zarr/__init__.py new file mode 100644 index 0000000..aa5ad60 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/__init__.py @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tutorial ETL pipeline for HDF5 to Zarr conversion.""" + +from . import ( + h5_data_source, # noqa: F401 + h5_to_zarr_transformation, # noqa: F401 + tutorial_config, # noqa: F401 + tutorial_validator, # noqa: F401 + zarr_data_source, # noqa: F401 +) diff --git a/examples/tutorials/etl_ensight_to_zarr/ensight_data_source.py b/examples/tutorials/etl_ensight_to_zarr/ensight_data_source.py new file mode 100644 index 0000000..8ff71b5 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/ensight_data_source.py @@ -0,0 +1,154 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np +import pyvista as pv +import vtk + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class EnSightDataSource(DataSource): + """Drop-in replacement for CGNSDataSource to read EnSight Gold .case files.""" + + def __init__(self, cfg: ProcessingConfig, input_dir: str): + super().__init__(cfg) + self.input_dir = Path(input_dir) + if not self.input_dir.exists(): + raise FileNotFoundError(f"Input directory {self.input_dir} does not exist") + + def get_file_list(self) -> List[str]: + """Get list of .case files to process.""" + case_files = list(self.input_dir.glob("*.case")) + filenames = [f.stem for f in case_files] + self.logger.info(f"Found {len(filenames)} EnSight files to process") + return sorted(filenames) + + def _read_ensight(self, filepath: Path) -> pv.MultiBlock: + """Read an EnSight Gold file and return a MultiBlock dataset.""" + try: + reader = vtk.vtkEnSightGoldBinaryReader() + reader.SetCaseFileName(str(filepath)) + reader.ReadAllVariablesOn() + reader.Update() + return pv.wrap(reader.GetOutput()) + except Exception as e: + self.logger.error(f"Error reading EnSight file {filepath}: {e}") + return None + + def _flatten_multiblock(self, dataset: pv.MultiBlock) -> List[pv.DataSet]: + """Recursively flatten MultiBlock into a list of non-empty blocks.""" + blocks = [] + for block in dataset: + if isinstance(block, pv.MultiBlock): + blocks.extend(self._flatten_multiblock(block)) + elif block and block.n_points > 0 and block.n_cells > 0: + blocks.append(block) + return blocks + + def _extract_surface_mesh(self, dataset: pv.MultiBlock) -> pv.PolyData: + """Extract a clean surface mesh handling inconsistent variables across blocks.""" + blocks = self._flatten_multiblock(dataset) + + if not blocks: + raise RuntimeError("No valid blocks found in EnSight dataset") + + # Separate blocks into full vs partial (having >1 point data field) + blocks_full, blocks_partial = [], [] + for block in blocks: + if len(block.point_data.keys()) > 1: + blocks_full.append(block) + else: + blocks_partial.append(block) + + surface_full = pv.merge([b.extract_surface() for b in blocks_full]) if blocks_full else None + surface_partial = pv.merge([b.extract_surface() for b in blocks_partial]) if blocks_partial else None + + if surface_full is None and surface_partial is None: + raise RuntimeError("No valid surfaces could be extracted") + + if surface_full is None: + return surface_partial.triangulate() + if surface_partial is None: + return surface_full.triangulate() + + # Fill missing variables in partial blocks + full_arrays = surface_full.point_data + partial_arrays = surface_partial.point_data + missing = set(full_arrays.keys()) - set(partial_arrays.keys()) + for name in missing: + src = full_arrays[name] + placeholder = np.zeros((surface_partial.n_points, src.shape[1])) if src.ndim > 1 else np.zeros(surface_partial.n_points) + surface_partial.point_data[name] = placeholder + self.logger.debug(f"Added placeholder for missing variable '{name}'") + + # Merge surfaces + final_mesh = surface_full.merge(surface_partial) + return final_mesh.triangulate() + + def _read_mesh(self, filename: str) -> pv.PolyData: + """Read a single EnSight file and return the triangulated surface mesh.""" + filepath = self.input_dir / f"{filename}.case" + if not filepath.exists(): + raise FileNotFoundError(f"File not found: {filepath}") + + self.logger.warning(f"Reading {filepath}") + dataset = self._read_ensight(filepath) + if dataset is None: + raise ValueError(f"Failed to read EnSight file: {filepath}") + + surface_mesh = self._extract_surface_mesh(dataset) + + # Convert cell data to point data if present + if surface_mesh.cell_data: + surface_mesh = surface_mesh.cell_data_to_point_data() + + return surface_mesh + + def read_file(self, filename: str) -> Dict[str, Any]: + """Read one EnSight file and extract all data in the same format as CGNSDataSource.""" + surface_mesh = self._read_mesh(filename) + + data = { + "coordinates": np.array(surface_mesh.points), + "faces": np.array(surface_mesh.faces).reshape(-1, 4)[:, 1:], # triangulated + "metadata": { + "n_points": surface_mesh.n_points, + "n_cells": surface_mesh.n_cells, + "bounds": surface_mesh.bounds, + }, + "filename": filename + } + + for field_name in surface_mesh.point_data.keys(): + data[field_name] = np.array(surface_mesh.point_data[field_name]) + self.logger.info(f"Extracted point data field: {field_name}") + + self.logger.warning(f"Loaded data with {surface_mesh.n_points} points and {surface_mesh.n_cells} cells") + return data + + def _get_output_path(self, filename: str) -> Path: + raise NotImplementedError("EnSightDataSource only supports reading") + + def _write_impl_temp_file(self, data: Dict[str, Any], output_path: Path) -> None: + raise NotImplementedError("EnSightDataSource only supports reading") + + def should_skip(self, filename: str) -> bool: + return False diff --git a/examples/tutorials/etl_ensight_to_zarr/ensight_to_zarr_transformation.py b/examples/tutorials/etl_ensight_to_zarr/ensight_to_zarr_transformation.py new file mode 100644 index 0000000..e33b0c7 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/ensight_to_zarr_transformation.py @@ -0,0 +1,120 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import numpy as np +import zarr + +from physicsnemo_curator.etl.data_transformations import DataTransformation +from physicsnemo_curator.etl.processing_config import ProcessingConfig + +class EnSightToZarrTransformation(DataTransformation): + """Transform EnSight Gold data into Zarr-optimized format.""" + + def __init__( + self, cfg: ProcessingConfig, chunk_size: int = 500, compression_level: int = 3 + ): + super().__init__(cfg) + self.chunk_size = chunk_size + self.compression_level = compression_level + + self.compressor = zarr.codecs.BloscCodec( + cname="zstd", + clevel=self.compression_level, + shuffle=zarr.codecs.BloscShuffle.shuffle, + ) + + def transform(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform EnSight data to Zarr-optimized format.""" + self.logger.info(f"Transforming {data['filename']} for Zarr storage") + + num_points = len(data["coordinates"]) + chunk_points = min(self.chunk_size, num_points) + + zarr_data = { + "coordinates": { + "data": data["coordinates"].astype(np.float32), + "chunks": (chunk_points, 3), + "compressor": self.compressor, + "dtype": np.float32, + }, + "faces": {}, + } + + if "faces" in data: + num_cells = len(data["faces"]) + chunk_cells = min(self.chunk_size, num_cells) + zarr_data["faces"] = { + "data": data["faces"].astype(np.int32), + "chunks": (chunk_cells, 3), + "compressor": self.compressor, + "dtype": np.int32, + } + + # Process all point data fields + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue + + if isinstance(field_data, np.ndarray): + self.logger.info(f"Processing field: {field_name} with shape {field_data.shape}") + + if field_data.ndim == 1: + zarr_data[field_name] = { + "data": field_data.astype(np.float32), + "chunks": (chunk_points,), + "compressor": self.compressor, + "dtype": np.float32, + } + elif field_data.ndim == 2: + zarr_data[field_name] = { + "data": field_data.astype(np.float32), + "chunks": (chunk_points, field_data.shape[1]), + "compressor": self.compressor, + "dtype": np.float32, + } + + # Add metadata and statistics + metadata = data.get("metadata", {}) + metadata.update({ + "num_points": num_points, + "chunk_size": chunk_points, + "compression": "zstd", + "compression_level": self.compression_level, + }) + + for field_name, field_data in data.items(): + if field_name in ["coordinates", "faces", "metadata", "filename"]: + continue + if isinstance(field_data, np.ndarray): + if field_data.ndim == 1: + metadata[f"{field_name}_min"] = float(np.min(field_data)) + metadata[f"{field_name}_max"] = float(np.max(field_data)) + metadata[f"{field_name}_mean"] = float(np.mean(field_data)) + elif field_data.ndim == 2: + magnitude = np.linalg.norm(field_data, axis=1) + metadata[f"{field_name}_magnitude_max"] = float(np.max(magnitude)) + metadata[f"{field_name}_magnitude_mean"] = float(np.mean(magnitude)) + zarr_data[f"{field_name}_magnitude"] = { + "data": magnitude.astype(np.float32), + "chunks": (chunk_points,), + "compressor": self.compressor, + "dtype": np.float32, + } + + zarr_data["metadata"] = metadata + return zarr_data diff --git a/examples/tutorials/etl_ensight_to_zarr/generate_sample_data.py b/examples/tutorials/etl_ensight_to_zarr/generate_sample_data.py new file mode 100644 index 0000000..1428404 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/generate_sample_data.py @@ -0,0 +1,77 @@ +import os +import numpy as np +import pyvista as pv +from vtk import vtkEnSightWriter + +def generate_mesh_with_fields(run_number, resolution=20): + """Generate a mesh with simulation fields for one run.""" + radius = 1.0 + 0.1 * np.sin(run_number) + mesh = pv.Sphere(radius=radius, theta_resolution=resolution, phi_resolution=resolution) + + points = mesh.points + num_points = mesh.n_points + + distances = np.linalg.norm(points, axis=1) + temperature = 250.0 + 100.0 * (distances / distances.max()) + np.random.normal(0, 5, num_points) + mesh.point_data["Temperature"] = temperature.astype(np.float32) + + pressure = 101325.0 + 1000.0 * np.sin(points[:, 2] * np.pi) + np.random.normal(0, 100, num_points) + mesh.point_data["Pressure"] = pressure.astype(np.float32) + + velocity = np.zeros((num_points, 3), dtype=np.float32) + velocity[:, 0] = -points[:, 1] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 1] = points[:, 0] * 2.0 + np.random.normal(0, 0.5, num_points) + velocity[:, 2] = points[:, 2] * 0.5 + np.random.normal(0, 0.3, num_points) + mesh.point_data["Velocity"] = velocity + + density = 1.2 + 0.2 * np.sin(distances * np.pi * 2) + np.random.normal(0, 0.05, num_points) + mesh.point_data["Density"] = density.astype(np.float32) + + vorticity = 2.0 * np.ones(num_points) + 0.5 * np.cos(distances * 3) + np.random.normal(0, 0.1, num_points) + mesh.point_data["Vorticity"] = vorticity.astype(np.float32) + + return mesh + +def create_ensight_case_file(run_number, output_dir="tutorial_data", resolution=20): + """Create one EnSight Gold case file for a simulation run.""" + os.makedirs(output_dir, exist_ok=True) + + mesh = generate_mesh_with_fields(run_number, resolution) + num_points = mesh.n_points + num_cells = mesh.n_cells + + # Create base filename + base_name = f"run_{run_number:03d}" + case_file = os.path.join(output_dir, f"{base_name}.case") + + # PyVista VTK EnSightWriter requires vtk.vtkUnstructuredGrid + # Wrap the mesh as unstructured grid if necessary + ug = pv.wrap(mesh) + + # Write EnSight Gold binary files + writer = vtkEnSightWriter() + writer.SetFileName(case_file) + writer.SetInputData(ug) + writer.SetCaseFileName(case_file) + writer.WriteAllVariablesOn() + writer.Update() + + print(f"Created {case_file}:") + print(f" - {num_points} points") + print(f" - {num_cells} cells") + print(f" - Fields: {', '.join(mesh.point_data.keys())}") + +def main(): + print("Generating sample EnSight Gold physics simulation dataset...\n") + resolutions = [15, 20, 18, 22, 20] + + for run_num in range(1, 6): + create_ensight_case_file(run_num, resolution=resolutions[run_num - 1]) + + print("\nDataset generation complete!") + print("Created 5 EnSight Gold .case files in the 'tutorial_data/' directory") + print("Each file contains a sphere mesh with Temperature, Pressure, Velocity, Density, and Vorticity fields") + print("\nYou can now run the ETL pipeline with your EnSightDataSource") + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/etl_ensight_to_zarr/run_etl.py b/examples/tutorials/etl_ensight_to_zarr/run_etl.py new file mode 100644 index 0000000..fd1d5e4 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/run_etl.py @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""EnSight to Zarr Tutorial ETL pipeline runner. + +Usage: + python run_etl.py \ + etl.source.input_dir=/path/to/ensight_data \ + etl.sink.output_dir=/path/to/zarr_output +""" + +import os +import sys + +# Add current directory to sys.path so Hydra can import tutorial modules +sys.path.insert(0, os.path.dirname(__file__)) + +import hydra +from hydra.utils import instantiate +from omegaconf import DictConfig + +from physicsnemo_curator.etl.etl_orchestrator import ETLOrchestrator +from physicsnemo_curator.etl.processing_config import ProcessingConfig +from physicsnemo_curator.utils import utils as curator_utils + + +@hydra.main(version_base="1.3", config_path=".", config_name="tutorial_config") +def main(cfg: DictConfig) -> None: + """Run the EnSight Gold to Zarr tutorial ETL pipeline.""" + # Setup multiprocessing + curator_utils.setup_multiprocessing() + + # Create processing config + processing_config = ProcessingConfig(**cfg.etl.processing) + + # Create validator if configured + validator = None + if "validator" in cfg.etl: + validator = instantiate( + cfg.etl.validator, + processing_config, + **{k: v for k, v in cfg.etl.source.items() if not k.startswith("_")}, + ) + + # Instantiate source (now EnSightDataSource) + source = instantiate(cfg.etl.source, processing_config) + + # Instantiate sink + sink = instantiate(cfg.etl.sink, processing_config) + + # Instantiate transformations + cfgs = {k: {"_args_": [processing_config]} for k in cfg.etl.transformations.keys()} + transformations = instantiate(cfg.etl.transformations, **cfgs) + + # Create and run orchestrator + orchestrator = ETLOrchestrator( + source=source, + sink=sink, + transformations=transformations, + processing_config=processing_config, + validator=validator, + ) + orchestrator.run() + + +if __name__ == "__main__": + main() + diff --git a/examples/tutorials/etl_ensight_to_zarr/tutorial_config.yaml b/examples/tutorials/etl_ensight_to_zarr/tutorial_config.yaml new file mode 100644 index 0000000..12135bb --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/tutorial_config.yaml @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tutorial ETL Pipeline Configuration +# This demonstrates the complete CGNS -> Zarr processing pipeline + +etl: + # Processing settings + processing: + num_processes: 2 # Use 2 processes for small tutorial dataset + args: {} + + # Validation (runs first) + validator: + _target_: tutorial_validator.EnSightTutorialValidator + _convert_: all + input_dir: ??? # Provided via command line + validation_level: "fields" # Full validation including data content + + # Source (reads EnSight files) + source: + _target_: ensight_data_source.EnSightDataSource + _convert_: all + input_dir: ??? # Provided via command line + + # Transformations (convert to Zarr format) + transformations: + ensight_to_zarr: + _target_: ensight_to_zarr_transformation.EnSightToZarrTransformation + _convert_: all + chunk_size: 500 + compression_level: 3 + + # Sink (writes Zarr stores) + sink: + _target_: zarr_data_source.ZarrDataSource + _convert_: all + output_dir: ??? # Provided via command line + diff --git a/examples/tutorials/etl_ensight_to_zarr/tutorial_validator.py b/examples/tutorials/etl_ensight_to_zarr/tutorial_validator.py new file mode 100644 index 0000000..88880af --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/tutorial_validator.py @@ -0,0 +1,245 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import List + +import numpy as np +import pyvista as pv + +from physicsnemo_curator.etl.dataset_validators import ( + DatasetValidator, + ValidationError, + ValidationLevel, +) +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class EnSightTutorialValidator(DatasetValidator): + """Validator for EnSight Gold physics simulation dataset.""" + + def __init__( + self, cfg: ProcessingConfig, input_dir: str, validation_level: str = "fields" + ): + """Initialize the validator. + + Args: + cfg: Processing configuration + input_dir: Directory containing EnSight files to validate + validation_level: "structure" or "fields" + """ + super().__init__(cfg) + self.input_dir = Path(input_dir) + self.validation_level = ValidationLevel(validation_level) + + # Define minimum requirements + self.min_points = 3 + self.min_cells = 1 + self.expected_spatial_dims = 3 + + def validate(self) -> List[ValidationError]: + """Validate all EnSight case files in the input directory.""" + errors = [] + + if not self.input_dir.exists(): + errors.append( + ValidationError( + path=self.input_dir, + message=f"Input directory does not exist: {self.input_dir}", + level=self.validation_level, + ) + ) + return errors + + case_files = list(self.input_dir.glob("*.case")) + if not case_files: + errors.append( + ValidationError( + path=self.input_dir, + message="No EnSight .case files found in input directory", + level=self.validation_level, + ) + ) + return errors + + for case_file in case_files: + file_errors = self.validate_single_item(case_file) + errors.extend(file_errors) + + return errors + + def validate_single_item(self, item: Path) -> List[ValidationError]: + """Validate a single EnSight .case file.""" + errors = [] + + try: + # Read EnSight file + reader = pv.get_reader(str(item)) + + # FIX: Use correct methods to enable all arrays instead of 'read_all_variables' + if hasattr(reader, "enable_all_point_arrays"): + reader.enable_all_point_arrays() + if hasattr(reader, "enable_all_cell_arrays"): + reader.enable_all_cell_arrays() + + dataset = reader.read() + + if not dataset or len(dataset) == 0: + errors.append( + ValidationError( + path=item, + message="EnSight file contains no valid blocks", + level=self.validation_level, + ) + ) + return errors + + # Merge blocks and extract surface for validation + full_mesh = None + for block in dataset: + if block and block.n_points > 0: + mesh = block.extract_surface().triangulate() + if full_mesh is None: + full_mesh = mesh + else: + full_mesh = full_mesh.merge(mesh) + + if full_mesh is None or full_mesh.n_points < self.min_points: + errors.append( + ValidationError( + path=item, + message="Merged surface mesh has too few points", + level=self.validation_level, + ) + ) + return errors + + # Structure validation + errors.extend(self._validate_structure(full_mesh, item)) + + # Field validation + if self.validation_level == ValidationLevel.FIELDS: + errors.extend(self._validate_fields(full_mesh, item)) + + except Exception as e: + errors.append( + ValidationError( + path=item, + message=f"Failed to open or process EnSight file: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_structure( + self, mesh: pv.PolyData, file_path: Path + ) -> List[ValidationError]: + """Validate mesh structure.""" + errors = [] + + if mesh.n_points < self.min_points: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few points: {mesh.n_points}", + level=self.validation_level, + ) + ) + + if mesh.n_cells < self.min_cells: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh has too few cells: {mesh.n_cells}", + level=self.validation_level, + ) + ) + + if mesh.points.shape[1] != self.expected_spatial_dims: + errors.append( + ValidationError( + path=file_path, + message=f"Mesh points have wrong dimensions: expected {self.expected_spatial_dims}D", + level=self.validation_level, + ) + ) + + try: + surf = mesh.extract_surface().triangulate() + if surf.n_points == 0: + errors.append( + ValidationError( + path=file_path, + message="Extracted surface mesh has no points", + level=self.validation_level, + ) + ) + except Exception as e: + errors.append( + ValidationError( + path=file_path, + message=f"Failed to extract surface mesh: {str(e)}", + level=self.validation_level, + ) + ) + + return errors + + def _validate_fields(self, mesh: pv.PolyData, file_path: Path) -> List[ValidationError]: + """Validate point data fields.""" + errors = [] + + if mesh.cell_data: + mesh = mesh.cell_data_to_point_data() + + if not mesh.point_data: + errors.append( + ValidationError( + path=file_path, + message="Mesh has no point data fields", + level=self.validation_level, + ) + ) + return errors + + for name, data in mesh.point_data.items(): + if len(data) != mesh.n_points: + errors.append( + ValidationError( + path=file_path, + message=f"Field '{name}' length does not match number of points", + level=self.validation_level, + ) + ) + if np.isnan(data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{name}' contains NaN values", + level=self.validation_level, + ) + ) + if np.isinf(data).any(): + errors.append( + ValidationError( + path=file_path, + message=f"Field '{name}' contains infinite values", + level=self.validation_level, + ) + ) + + return errors \ No newline at end of file diff --git a/examples/tutorials/etl_ensight_to_zarr/zarr_data_source.py b/examples/tutorials/etl_ensight_to_zarr/zarr_data_source.py new file mode 100644 index 0000000..176f7f0 --- /dev/null +++ b/examples/tutorials/etl_ensight_to_zarr/zarr_data_source.py @@ -0,0 +1,118 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Dict, List + +import zarr +from zarr.storage import LocalStore + +from physicsnemo_curator.etl.data_sources import DataSource +from physicsnemo_curator.etl.processing_config import ProcessingConfig + + +class ZarrDataSource(DataSource): + """DataSource for writing to Zarr stores.""" + + def __init__(self, cfg: ProcessingConfig, output_dir: str): + """Initialize the Zarr data source. + + Args: + cfg: Processing configuration + output_dir: Directory to write Zarr stores + """ + super().__init__(cfg) + self.output_dir = Path(output_dir) + + # Create output directory if it doesn't exist + self.output_dir.mkdir(parents=True, exist_ok=True) + + def get_file_list(self) -> List[str]: + """Not implemented - this DataSource only writes.""" + raise NotImplementedError("ZarrDataSource only supports writing") + + def read_file(self, filename: str) -> Dict[str, Any]: + """Not implemented - this DataSource only writes.""" + raise NotImplementedError("ZarrDataSource only supports writing") + + def _get_output_path(self, filename: str) -> Path: + """Get the output path for a given filename. + + Args: + filename: Name of the file to process + + Returns: + Path object representing the output location. + """ + return self.output_dir / f"{filename}.zarr" + + def _write_impl_temp_file(self, data: Dict[str, Any], output_path: Path) -> None: + """ + Implement actual data writing logic to a temporary Zarr store. + ... + """ + # Create Zarr store + self.logger.info(f"Creating Zarr store: {output_path}") + store = LocalStore(output_path) + root = zarr.open_group(store=store, mode="w") + + # Store metadata as root attributes + if "metadata" in data: + for key, value in data["metadata"].items(): + if hasattr(value, "item"): # numpy scalar + value = value.item() + root.attrs[key] = value + data.pop("metadata") + + # Write all arrays from the transformation + for array_name, array_info in data.items(): + root.create_array( + name=array_name, + data=array_info["data"], + chunks=array_info["chunks"], + compressors=( + array_info["compressor"] if array_info["compressor"] else None + ), + # REMOVED: dtype=array_info["dtype"], + ) + + # Add some store-level metadata + root.attrs["zarr_format"] = 3 + root.attrs["created_by"] = "physicsnemo-curator-tutorial" + + # Something weird is happening here. + # If this error occurs, the stores are created and we move to the next one. + # If this error does NOT occur, we seem to skip all the remaining files. + # Debug with Alexey. + self.logger.info("Successfully created Zarr store") + + def should_skip(self, filename: str) -> bool: + """Check if we should skip writing this store. + + Args: + filename: Base filename to check + + Returns: + True if store should be skipped (already exists) + """ + store_path = self.output_dir / f"{filename}.zarr" + exists = store_path.exists() + + if exists: + self.logger.info(f"Skipping {filename} - Zarr store already exists") + return True + + return False