From 2413c4fa96fa31d6f5baccee4caee60816e7246b Mon Sep 17 00:00:00 2001 From: Sai Krishnan Chandrasekar Date: Tue, 21 Oct 2025 17:16:24 -0700 Subject: [PATCH 1/4] Enable sharding for large arrays --- .../config/external_aero_etl_drivaerml.yaml | 2 +- .../external_aerodynamics/data_sources.py | 2 + .../data_transformations.py | 31 ++++- examples/external_aerodynamics/schemas.py | 1 + .../test_data_transformations.py | 131 ++++++++++++++++++ 5 files changed, 165 insertions(+), 2 deletions(-) diff --git a/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml b/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml index fe583eb..ba59b13 100644 --- a/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml +++ b/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml @@ -28,7 +28,7 @@ etl: model_type: surface # produce data for which model? surface, volume, combined processing: - num_processes: 12 + num_processes: 4 args: {} validator: diff --git a/examples/external_aerodynamics/data_sources.py b/examples/external_aerodynamics/data_sources.py index 7c364f3..fffdb92 100644 --- a/examples/external_aerodynamics/data_sources.py +++ b/examples/external_aerodynamics/data_sources.py @@ -246,6 +246,7 @@ def _write_zarr( name=field, data=array_info.data, chunks=array_info.chunks, + shards=array_info.shards, compressors=array_info.compressor if array_info.compressor else None, ) @@ -264,6 +265,7 @@ def _write_zarr( name=field, data=array_info.data, chunks=array_info.chunks, + shards=array_info.shards, compressors=( array_info.compressor if array_info.compressor else None ), diff --git a/examples/external_aerodynamics/data_transformations.py b/examples/external_aerodynamics/data_transformations.py index 1c1bf0f..044ef48 100644 --- a/examples/external_aerodynamics/data_transformations.py +++ b/examples/external_aerodynamics/data_transformations.py @@ -229,6 +229,8 @@ def __init__( compression_method: str = "zstd", compression_level: int = 5, chunk_size_mb: float = 1.0, # Default 1MB chunk size + chunks_per_shard: int = 1000, # Number of chunks per shard (~1GB shards with 1MB chunks) + sharding_threshold_gb: float = 1.0, # Enable sharding for arrays > 1GB ): super().__init__(cfg) # Zarr 3 codec configuration @@ -238,6 +240,8 @@ def __init__( shuffle=zarr.codecs.BloscShuffle.shuffle, ) self.chunk_size_mb = chunk_size_mb + self.chunks_per_shard = chunks_per_shard + self.sharding_threshold_gb = sharding_threshold_gb # Warn if chunk size might be problematic if chunk_size_mb < 1.0: @@ -254,7 +258,11 @@ def __init__( ) def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: - """Prepare array for Zarr storage with compression and chunking.""" + """Prepare array for Zarr storage with compression, chunking, and sharding. + + Sharding is automatically enabled for large arrays (>sharding_threshold_gb) + to reduce the number of files created on disk. Small arrays use chunks only. + """ if array is None: return None @@ -273,10 +281,31 @@ def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: ) chunks = (chunk_rows, shape[1]) + # Calculate shards for large arrays to reduce file count + shards = None + array_size_bytes = array.nbytes + threshold_bytes = int(self.sharding_threshold_gb * 1024 * 1024 * 1024) + + if array_size_bytes > threshold_bytes: + if len(shape) == 1: + # For 1D arrays: shard contains chunks_per_shard chunks + shard_size = min(shape[0], chunks[0] * self.chunks_per_shard) + shards = (shard_size,) + else: + # For 2D arrays: extend along first dimension to group multiple chunk-rows + shard_rows = min(shape[0], chunks[0] * self.chunks_per_shard) + shards = (shard_rows, shape[1]) + + logging.info( + f"Sharding enabled for array of size {array_size_bytes / (1024**3):.2f} GB: " + f"chunks={chunks}, shards={shards}" + ) + return PreparedZarrArrayInfo( data=np.float32(array), chunks=chunks, compressor=self.compressor, + shards=shards, ) def transform( diff --git a/examples/external_aerodynamics/schemas.py b/examples/external_aerodynamics/schemas.py index 5d0db99..80689ad 100644 --- a/examples/external_aerodynamics/schemas.py +++ b/examples/external_aerodynamics/schemas.py @@ -102,6 +102,7 @@ class PreparedZarrArrayInfo: data: np.ndarray chunks: tuple[int, ...] compressor: zarr.abc.codec + shards: Optional[tuple[int, ...]] = None @dataclass(frozen=True) diff --git a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py index 909ce32..c392afa 100644 --- a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py +++ b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py @@ -384,6 +384,137 @@ def test_chunk_size_effect(self, sample_data_processed): large_data_num_chunks = np.ceil(100000 / large_data_elements_per_chunk) assert large_data_num_chunks == 1 + def test_sharding_disabled_for_small_arrays(self, sample_data_processed): + """Test that sharding is NOT enabled for small arrays below threshold.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + sharding_threshold_gb=1.0, + ) + + result = transform.transform(sample_data_processed) + + # All arrays in sample_data_processed are small (<1GB), so no sharding + assert result.stl_coordinates.shards is None + assert result.stl_centers.shards is None + assert result.stl_faces.shards is None + assert result.stl_areas.shards is None + assert result.surface_mesh_centers.shards is None + assert result.surface_normals.shards is None + assert result.surface_areas.shards is None + assert result.surface_fields.shards is None + assert result.volume_mesh_centers.shards is None + assert result.volume_fields.shards is None + + def test_sharding_enabled_for_large_arrays(self): + """Test that sharding IS enabled for large arrays above threshold.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + sharding_threshold_gb=1.0, + ) + + # Create a large array > 1GB + # For float32, 1 GB = 1024^3 / 4 = 268,435,456 elements + # Create a 2D array: 300M elements / 3 columns = 100M rows + # This is 300M * 4 bytes = 1.2 GB + large_array = np.random.rand(100_000_000, 3).astype(np.float64) + + metadata = ExternalAerodynamicsMetadata( + filename="test_large", + dataset_type=ModelType.COMBINED, + stream_velocity=30.0, + air_density=1.205, + ) + + large_data = ExternalAerodynamicsExtractedDataInMemory( + metadata=metadata, + stl_coordinates=np.array([[0, 0, 0], [1, 1, 1]], dtype=np.float64), + stl_centers=np.array([[0.5, 0.5, 0.5]], dtype=np.float64), + stl_faces=np.array([[0, 1, 2]], dtype=np.int32), + stl_areas=np.array([1.0], dtype=np.float64), + volume_fields=large_array, # Large array + ) + + result = transform.transform(large_data) + + # Small arrays should have no sharding + assert result.stl_coordinates.shards is None + assert result.stl_centers.shards is None + + # Large array should have sharding enabled + assert result.volume_fields.shards is not None + assert len(result.volume_fields.shards) == 2 # 2D array + assert ( + result.volume_fields.shards[1] == 3 + ) # Second dimension should match array + + def test_sharding_calculation_1d_array(self): + """Test correct shard shape calculation for 1D arrays.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=100, # Use smaller value for easier testing + sharding_threshold_gb=0.1, # Lower threshold for testing + ) + + # Create 1D array > 0.1GB (400MB as float64, 200MB as float32) + # 50M elements * 8 bytes = 400 MB + large_1d_array = np.random.rand(50_000_000).astype(np.float64) + + result = transform._prepare_array(large_1d_array) + + # Check that sharding is enabled + assert result.shards is not None + assert len(result.shards) == 1 + + # For 1D array: shard_size = chunks[0] * chunks_per_shard + # With 1MB chunks and float32 (4 bytes): chunk_size = 1MB / 4 = 262144 elements + # So shards should be ~26,214,400 elements (100 chunks) + expected_shard_size = result.chunks[0] * 100 + assert result.shards[0] == expected_shard_size + + def test_sharding_calculation_2d_array(self): + """Test correct shard shape calculation for 2D arrays.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=50, # Use smaller value for easier testing + sharding_threshold_gb=0.1, # Lower threshold for testing + ) + + # Create 2D array > 0.1GB (2.4GB as float64, 1.2GB as float32) + # 100M rows * 3 cols * 8 bytes = 2.4 GB + large_2d_array = np.random.rand(100_000_000, 3).astype(np.float64) + + result = transform._prepare_array(large_2d_array) + + # Check that sharding is enabled + assert result.shards is not None + assert len(result.shards) == 2 + + # For 2D arrays: shard_rows = chunk_rows * chunks_per_shard + # Second dimension should match array shape + expected_shard_rows = result.chunks[0] * 50 + assert result.shards[0] == expected_shard_rows + assert result.shards[1] == 3 # Matches second dimension + + def test_sharding_initialization_defaults(self): + """Test that default sharding parameters are set correctly.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation(config) + + # Check defaults match Zarr recommendations + assert transform.chunks_per_shard == 1000 # Zarr recommended + assert transform.sharding_threshold_gb == 1.0 + assert transform.chunk_size_mb == 1.0 + class TestExternalAerodynamicsSTLTransformation: """Test the ExternalAerodynamicsSTLTransformation class.""" From 6e635211fbd2bc1d13ce90e8bda3897222f49c12 Mon Sep 17 00:00:00 2001 From: Sai Krishnan Chandrasekar Date: Tue, 21 Oct 2025 17:16:49 -0700 Subject: [PATCH 2/4] Fix incorrect config update --- .../config/external_aero_etl_drivaerml.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml b/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml index ba59b13..fe583eb 100644 --- a/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml +++ b/examples/external_aerodynamics/config/external_aero_etl_drivaerml.yaml @@ -28,7 +28,7 @@ etl: model_type: surface # produce data for which model? surface, volume, combined processing: - num_processes: 4 + num_processes: 12 args: {} validator: From 7f09fcca1e467429c35ccff61ab461d716012c92 Mon Sep 17 00:00:00 2001 From: Sai Krishnan Chandrasekar Date: Thu, 23 Oct 2025 11:56:06 -0700 Subject: [PATCH 3/4] Simplified sharding logic --- .../data_transformations.py | 32 +++-------- .../test_data_transformations.py | 54 +++++++++---------- 2 files changed, 34 insertions(+), 52 deletions(-) diff --git a/examples/external_aerodynamics/data_transformations.py b/examples/external_aerodynamics/data_transformations.py index 044ef48..b8ab1fd 100644 --- a/examples/external_aerodynamics/data_transformations.py +++ b/examples/external_aerodynamics/data_transformations.py @@ -230,7 +230,6 @@ def __init__( compression_level: int = 5, chunk_size_mb: float = 1.0, # Default 1MB chunk size chunks_per_shard: int = 1000, # Number of chunks per shard (~1GB shards with 1MB chunks) - sharding_threshold_gb: float = 1.0, # Enable sharding for arrays > 1GB ): super().__init__(cfg) # Zarr 3 codec configuration @@ -241,7 +240,6 @@ def __init__( ) self.chunk_size_mb = chunk_size_mb self.chunks_per_shard = chunks_per_shard - self.sharding_threshold_gb = sharding_threshold_gb # Warn if chunk size might be problematic if chunk_size_mb < 1.0: @@ -260,8 +258,8 @@ def __init__( def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: """Prepare array for Zarr storage with compression, chunking, and sharding. - Sharding is automatically enabled for large arrays (>sharding_threshold_gb) - to reduce the number of files created on disk. Small arrays use chunks only. + Sharding is always enabled to reduce the number of files created on disk. + Each shard contains approximately chunks_per_shard chunks (~1GB per shard by default). """ if array is None: return None @@ -274,32 +272,18 @@ def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: if len(shape) == 1: chunk_size = min(shape[0], target_chunk_size // item_size) chunks = (chunk_size,) + # For 1D arrays: shard contains chunks_per_shard chunks + shard_size = min(shape[0], chunks[0] * self.chunks_per_shard) + shards = (shard_size,) else: # For 2D arrays, try to keep rows together chunk_rows = min( shape[0], max(1, target_chunk_size // (item_size * shape[1])) ) chunks = (chunk_rows, shape[1]) - - # Calculate shards for large arrays to reduce file count - shards = None - array_size_bytes = array.nbytes - threshold_bytes = int(self.sharding_threshold_gb * 1024 * 1024 * 1024) - - if array_size_bytes > threshold_bytes: - if len(shape) == 1: - # For 1D arrays: shard contains chunks_per_shard chunks - shard_size = min(shape[0], chunks[0] * self.chunks_per_shard) - shards = (shard_size,) - else: - # For 2D arrays: extend along first dimension to group multiple chunk-rows - shard_rows = min(shape[0], chunks[0] * self.chunks_per_shard) - shards = (shard_rows, shape[1]) - - logging.info( - f"Sharding enabled for array of size {array_size_bytes / (1024**3):.2f} GB: " - f"chunks={chunks}, shards={shards}" - ) + # For 2D arrays: extend along first dimension to group multiple chunk-rows + shard_rows = min(shape[0], chunks[0] * self.chunks_per_shard) + shards = (shard_rows, shape[1]) return PreparedZarrArrayInfo( data=np.float32(array), diff --git a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py index c392afa..83e00b1 100644 --- a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py +++ b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py @@ -384,38 +384,36 @@ def test_chunk_size_effect(self, sample_data_processed): large_data_num_chunks = np.ceil(100000 / large_data_elements_per_chunk) assert large_data_num_chunks == 1 - def test_sharding_disabled_for_small_arrays(self, sample_data_processed): - """Test that sharding is NOT enabled for small arrays below threshold.""" + def test_sharding_always_enabled(self, sample_data_processed): + """Test that sharding is always enabled for all arrays.""" config = ProcessingConfig(num_processes=1) transform = ExternalAerodynamicsZarrTransformation( config, chunk_size_mb=1.0, chunks_per_shard=1000, - sharding_threshold_gb=1.0, ) result = transform.transform(sample_data_processed) - # All arrays in sample_data_processed are small (<1GB), so no sharding - assert result.stl_coordinates.shards is None - assert result.stl_centers.shards is None - assert result.stl_faces.shards is None - assert result.stl_areas.shards is None - assert result.surface_mesh_centers.shards is None - assert result.surface_normals.shards is None - assert result.surface_areas.shards is None - assert result.surface_fields.shards is None - assert result.volume_mesh_centers.shards is None - assert result.volume_fields.shards is None - - def test_sharding_enabled_for_large_arrays(self): - """Test that sharding IS enabled for large arrays above threshold.""" + # All arrays should have sharding enabled + assert result.stl_coordinates.shards is not None + assert result.stl_centers.shards is not None + assert result.stl_faces.shards is not None + assert result.stl_areas.shards is not None + assert result.surface_mesh_centers.shards is not None + assert result.surface_normals.shards is not None + assert result.surface_areas.shards is not None + assert result.surface_fields.shards is not None + assert result.volume_mesh_centers.shards is not None + assert result.volume_fields.shards is not None + + def test_sharding_shape_for_large_arrays(self): + """Test that sharding shape is calculated correctly for large arrays.""" config = ProcessingConfig(num_processes=1) transform = ExternalAerodynamicsZarrTransformation( config, chunk_size_mb=1.0, chunks_per_shard=1000, - sharding_threshold_gb=1.0, ) # Create a large array > 1GB @@ -442,16 +440,19 @@ def test_sharding_enabled_for_large_arrays(self): result = transform.transform(large_data) - # Small arrays should have no sharding - assert result.stl_coordinates.shards is None - assert result.stl_centers.shards is None + # All arrays should have sharding enabled + assert result.stl_coordinates.shards is not None + assert result.stl_centers.shards is not None - # Large array should have sharding enabled + # Check large array shard shape assert result.volume_fields.shards is not None assert len(result.volume_fields.shards) == 2 # 2D array assert ( result.volume_fields.shards[1] == 3 ) # Second dimension should match array + # First dimension should be chunk_rows * chunks_per_shard (or array size if smaller) + expected_shard_rows = min(100_000_000, result.volume_fields.chunks[0] * 1000) + assert result.volume_fields.shards[0] == expected_shard_rows def test_sharding_calculation_1d_array(self): """Test correct shard shape calculation for 1D arrays.""" @@ -460,10 +461,9 @@ def test_sharding_calculation_1d_array(self): config, chunk_size_mb=1.0, chunks_per_shard=100, # Use smaller value for easier testing - sharding_threshold_gb=0.1, # Lower threshold for testing ) - # Create 1D array > 0.1GB (400MB as float64, 200MB as float32) + # Create 1D array (400MB as float64, 200MB as float32) # 50M elements * 8 bytes = 400 MB large_1d_array = np.random.rand(50_000_000).astype(np.float64) @@ -486,10 +486,9 @@ def test_sharding_calculation_2d_array(self): config, chunk_size_mb=1.0, chunks_per_shard=50, # Use smaller value for easier testing - sharding_threshold_gb=0.1, # Lower threshold for testing ) - # Create 2D array > 0.1GB (2.4GB as float64, 1.2GB as float32) + # Create 2D array (2.4GB as float64, 1.2GB as float32) # 100M rows * 3 cols * 8 bytes = 2.4 GB large_2d_array = np.random.rand(100_000_000, 3).astype(np.float64) @@ -511,8 +510,7 @@ def test_sharding_initialization_defaults(self): transform = ExternalAerodynamicsZarrTransformation(config) # Check defaults match Zarr recommendations - assert transform.chunks_per_shard == 1000 # Zarr recommended - assert transform.sharding_threshold_gb == 1.0 + assert transform.chunks_per_shard == 1000 # Zarr recommended (~1GB shards) assert transform.chunk_size_mb == 1.0 From 23e949eed8b6d1487faeca5012b0fc541c2ffe06 Mon Sep 17 00:00:00 2001 From: Sai Krishnan Chandrasekar Date: Thu, 23 Oct 2025 12:23:42 -0700 Subject: [PATCH 4/4] Fix sharding divisiblity bug --- .../data_transformations.py | 18 ++++++- .../test_data_transformations.py | 53 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/examples/external_aerodynamics/data_transformations.py b/examples/external_aerodynamics/data_transformations.py index b8ab1fd..9e97d93 100644 --- a/examples/external_aerodynamics/data_transformations.py +++ b/examples/external_aerodynamics/data_transformations.py @@ -273,7 +273,14 @@ def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: chunk_size = min(shape[0], target_chunk_size // item_size) chunks = (chunk_size,) # For 1D arrays: shard contains chunks_per_shard chunks - shard_size = min(shape[0], chunks[0] * self.chunks_per_shard) + # Ensure shard size is a multiple of chunk size + ideal_shard_size = chunks[0] * self.chunks_per_shard + if shape[0] <= ideal_shard_size: + # Array fits in one shard - round up to nearest chunk multiple + num_chunks = (shape[0] + chunks[0] - 1) // chunks[0] + shard_size = num_chunks * chunks[0] + else: + shard_size = ideal_shard_size shards = (shard_size,) else: # For 2D arrays, try to keep rows together @@ -282,7 +289,14 @@ def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: ) chunks = (chunk_rows, shape[1]) # For 2D arrays: extend along first dimension to group multiple chunk-rows - shard_rows = min(shape[0], chunks[0] * self.chunks_per_shard) + # Ensure shard rows is a multiple of chunk rows + ideal_shard_rows = chunks[0] * self.chunks_per_shard + if shape[0] <= ideal_shard_rows: + # Array fits in one shard - round up to nearest chunk multiple + num_chunks = (shape[0] + chunks[0] - 1) // chunks[0] + shard_rows = num_chunks * chunks[0] + else: + shard_rows = ideal_shard_rows shards = (shard_rows, shape[1]) return PreparedZarrArrayInfo( diff --git a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py index 83e00b1..79e1ea9 100644 --- a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py +++ b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py @@ -513,6 +513,59 @@ def test_sharding_initialization_defaults(self): assert transform.chunks_per_shard == 1000 # Zarr recommended (~1GB shards) assert transform.chunk_size_mb == 1.0 + def test_sharding_divisibility_requirement(self): + """Test that shard shapes are always divisible by chunk shapes.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + ) + + # Test with various array sizes that might not divide evenly + test_cases = [ + (1234, 3), # Odd number of rows + (999, 5), # Another odd number + (1000000, 4), # Large array + (50, 3), # Very small array + (262144 + 100, 3), # Just over one chunk + ] + + for rows, cols in test_cases: + array = np.random.rand(rows, cols).astype(np.float64) + result = transform._prepare_array(array) + + # Check that shards are divisible by chunks in each dimension + assert result.shards[0] % result.chunks[0] == 0, ( + f"Shard rows {result.shards[0]} not divisible by chunk rows {result.chunks[0]} " + f"for array shape ({rows}, {cols})" + ) + assert ( + result.shards[1] == result.chunks[1] + ), f"Shard cols {result.shards[1]} should equal chunk cols {result.chunks[1]}" + + def test_sharding_small_array_one_shard(self): + """Test that small arrays fit into a single shard with proper rounding.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + ) + + # Create small array that would need multiple chunks but < chunks_per_shard + # With 1MB chunks and float64: ~262144 elements per chunk for 1D + # Create array with 500,000 elements (needs ~2 chunks) + small_array = np.random.rand(500_000).astype(np.float64) + result = transform._prepare_array(small_array) + + # Should have 1 shard that contains all chunks + num_chunks_needed = (500_000 + result.chunks[0] - 1) // result.chunks[0] + expected_shard_size = num_chunks_needed * result.chunks[0] + assert result.shards[0] == expected_shard_size + assert result.shards[0] >= 500_000 # Shard is big enough for array + assert result.shards[0] % result.chunks[0] == 0 # Properly divisible + class TestExternalAerodynamicsSTLTransformation: """Test the ExternalAerodynamicsSTLTransformation class."""