diff --git a/examples/external_aerodynamics/data_sources.py b/examples/external_aerodynamics/data_sources.py index 7c364f3..fffdb92 100644 --- a/examples/external_aerodynamics/data_sources.py +++ b/examples/external_aerodynamics/data_sources.py @@ -246,6 +246,7 @@ def _write_zarr( name=field, data=array_info.data, chunks=array_info.chunks, + shards=array_info.shards, compressors=array_info.compressor if array_info.compressor else None, ) @@ -264,6 +265,7 @@ def _write_zarr( name=field, data=array_info.data, chunks=array_info.chunks, + shards=array_info.shards, compressors=( array_info.compressor if array_info.compressor else None ), diff --git a/examples/external_aerodynamics/data_transformations.py b/examples/external_aerodynamics/data_transformations.py index 1c1bf0f..9e97d93 100644 --- a/examples/external_aerodynamics/data_transformations.py +++ b/examples/external_aerodynamics/data_transformations.py @@ -229,6 +229,7 @@ def __init__( compression_method: str = "zstd", compression_level: int = 5, chunk_size_mb: float = 1.0, # Default 1MB chunk size + chunks_per_shard: int = 1000, # Number of chunks per shard (~1GB shards with 1MB chunks) ): super().__init__(cfg) # Zarr 3 codec configuration @@ -238,6 +239,7 @@ def __init__( shuffle=zarr.codecs.BloscShuffle.shuffle, ) self.chunk_size_mb = chunk_size_mb + self.chunks_per_shard = chunks_per_shard # Warn if chunk size might be problematic if chunk_size_mb < 1.0: @@ -254,7 +256,11 @@ def __init__( ) def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: - """Prepare array for Zarr storage with compression and chunking.""" + """Prepare array for Zarr storage with compression, chunking, and sharding. + + Sharding is always enabled to reduce the number of files created on disk. + Each shard contains approximately chunks_per_shard chunks (~1GB per shard by default). + """ if array is None: return None @@ -266,17 +272,38 @@ def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo: if len(shape) == 1: chunk_size = min(shape[0], target_chunk_size // item_size) chunks = (chunk_size,) + # For 1D arrays: shard contains chunks_per_shard chunks + # Ensure shard size is a multiple of chunk size + ideal_shard_size = chunks[0] * self.chunks_per_shard + if shape[0] <= ideal_shard_size: + # Array fits in one shard - round up to nearest chunk multiple + num_chunks = (shape[0] + chunks[0] - 1) // chunks[0] + shard_size = num_chunks * chunks[0] + else: + shard_size = ideal_shard_size + shards = (shard_size,) else: # For 2D arrays, try to keep rows together chunk_rows = min( shape[0], max(1, target_chunk_size // (item_size * shape[1])) ) chunks = (chunk_rows, shape[1]) + # For 2D arrays: extend along first dimension to group multiple chunk-rows + # Ensure shard rows is a multiple of chunk rows + ideal_shard_rows = chunks[0] * self.chunks_per_shard + if shape[0] <= ideal_shard_rows: + # Array fits in one shard - round up to nearest chunk multiple + num_chunks = (shape[0] + chunks[0] - 1) // chunks[0] + shard_rows = num_chunks * chunks[0] + else: + shard_rows = ideal_shard_rows + shards = (shard_rows, shape[1]) return PreparedZarrArrayInfo( data=np.float32(array), chunks=chunks, compressor=self.compressor, + shards=shards, ) def transform( diff --git a/examples/external_aerodynamics/schemas.py b/examples/external_aerodynamics/schemas.py index 5d0db99..80689ad 100644 --- a/examples/external_aerodynamics/schemas.py +++ b/examples/external_aerodynamics/schemas.py @@ -102,6 +102,7 @@ class PreparedZarrArrayInfo: data: np.ndarray chunks: tuple[int, ...] compressor: zarr.abc.codec + shards: Optional[tuple[int, ...]] = None @dataclass(frozen=True) diff --git a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py index 909ce32..79e1ea9 100644 --- a/tests/test_examples/test_external_aerodynamics/test_data_transformations.py +++ b/tests/test_examples/test_external_aerodynamics/test_data_transformations.py @@ -384,6 +384,188 @@ def test_chunk_size_effect(self, sample_data_processed): large_data_num_chunks = np.ceil(100000 / large_data_elements_per_chunk) assert large_data_num_chunks == 1 + def test_sharding_always_enabled(self, sample_data_processed): + """Test that sharding is always enabled for all arrays.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + ) + + result = transform.transform(sample_data_processed) + + # All arrays should have sharding enabled + assert result.stl_coordinates.shards is not None + assert result.stl_centers.shards is not None + assert result.stl_faces.shards is not None + assert result.stl_areas.shards is not None + assert result.surface_mesh_centers.shards is not None + assert result.surface_normals.shards is not None + assert result.surface_areas.shards is not None + assert result.surface_fields.shards is not None + assert result.volume_mesh_centers.shards is not None + assert result.volume_fields.shards is not None + + def test_sharding_shape_for_large_arrays(self): + """Test that sharding shape is calculated correctly for large arrays.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + ) + + # Create a large array > 1GB + # For float32, 1 GB = 1024^3 / 4 = 268,435,456 elements + # Create a 2D array: 300M elements / 3 columns = 100M rows + # This is 300M * 4 bytes = 1.2 GB + large_array = np.random.rand(100_000_000, 3).astype(np.float64) + + metadata = ExternalAerodynamicsMetadata( + filename="test_large", + dataset_type=ModelType.COMBINED, + stream_velocity=30.0, + air_density=1.205, + ) + + large_data = ExternalAerodynamicsExtractedDataInMemory( + metadata=metadata, + stl_coordinates=np.array([[0, 0, 0], [1, 1, 1]], dtype=np.float64), + stl_centers=np.array([[0.5, 0.5, 0.5]], dtype=np.float64), + stl_faces=np.array([[0, 1, 2]], dtype=np.int32), + stl_areas=np.array([1.0], dtype=np.float64), + volume_fields=large_array, # Large array + ) + + result = transform.transform(large_data) + + # All arrays should have sharding enabled + assert result.stl_coordinates.shards is not None + assert result.stl_centers.shards is not None + + # Check large array shard shape + assert result.volume_fields.shards is not None + assert len(result.volume_fields.shards) == 2 # 2D array + assert ( + result.volume_fields.shards[1] == 3 + ) # Second dimension should match array + # First dimension should be chunk_rows * chunks_per_shard (or array size if smaller) + expected_shard_rows = min(100_000_000, result.volume_fields.chunks[0] * 1000) + assert result.volume_fields.shards[0] == expected_shard_rows + + def test_sharding_calculation_1d_array(self): + """Test correct shard shape calculation for 1D arrays.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=100, # Use smaller value for easier testing + ) + + # Create 1D array (400MB as float64, 200MB as float32) + # 50M elements * 8 bytes = 400 MB + large_1d_array = np.random.rand(50_000_000).astype(np.float64) + + result = transform._prepare_array(large_1d_array) + + # Check that sharding is enabled + assert result.shards is not None + assert len(result.shards) == 1 + + # For 1D array: shard_size = chunks[0] * chunks_per_shard + # With 1MB chunks and float32 (4 bytes): chunk_size = 1MB / 4 = 262144 elements + # So shards should be ~26,214,400 elements (100 chunks) + expected_shard_size = result.chunks[0] * 100 + assert result.shards[0] == expected_shard_size + + def test_sharding_calculation_2d_array(self): + """Test correct shard shape calculation for 2D arrays.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=50, # Use smaller value for easier testing + ) + + # Create 2D array (2.4GB as float64, 1.2GB as float32) + # 100M rows * 3 cols * 8 bytes = 2.4 GB + large_2d_array = np.random.rand(100_000_000, 3).astype(np.float64) + + result = transform._prepare_array(large_2d_array) + + # Check that sharding is enabled + assert result.shards is not None + assert len(result.shards) == 2 + + # For 2D arrays: shard_rows = chunk_rows * chunks_per_shard + # Second dimension should match array shape + expected_shard_rows = result.chunks[0] * 50 + assert result.shards[0] == expected_shard_rows + assert result.shards[1] == 3 # Matches second dimension + + def test_sharding_initialization_defaults(self): + """Test that default sharding parameters are set correctly.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation(config) + + # Check defaults match Zarr recommendations + assert transform.chunks_per_shard == 1000 # Zarr recommended (~1GB shards) + assert transform.chunk_size_mb == 1.0 + + def test_sharding_divisibility_requirement(self): + """Test that shard shapes are always divisible by chunk shapes.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + ) + + # Test with various array sizes that might not divide evenly + test_cases = [ + (1234, 3), # Odd number of rows + (999, 5), # Another odd number + (1000000, 4), # Large array + (50, 3), # Very small array + (262144 + 100, 3), # Just over one chunk + ] + + for rows, cols in test_cases: + array = np.random.rand(rows, cols).astype(np.float64) + result = transform._prepare_array(array) + + # Check that shards are divisible by chunks in each dimension + assert result.shards[0] % result.chunks[0] == 0, ( + f"Shard rows {result.shards[0]} not divisible by chunk rows {result.chunks[0]} " + f"for array shape ({rows}, {cols})" + ) + assert ( + result.shards[1] == result.chunks[1] + ), f"Shard cols {result.shards[1]} should equal chunk cols {result.chunks[1]}" + + def test_sharding_small_array_one_shard(self): + """Test that small arrays fit into a single shard with proper rounding.""" + config = ProcessingConfig(num_processes=1) + transform = ExternalAerodynamicsZarrTransformation( + config, + chunk_size_mb=1.0, + chunks_per_shard=1000, + ) + + # Create small array that would need multiple chunks but < chunks_per_shard + # With 1MB chunks and float64: ~262144 elements per chunk for 1D + # Create array with 500,000 elements (needs ~2 chunks) + small_array = np.random.rand(500_000).astype(np.float64) + result = transform._prepare_array(small_array) + + # Should have 1 shard that contains all chunks + num_chunks_needed = (500_000 + result.chunks[0] - 1) // result.chunks[0] + expected_shard_size = num_chunks_needed * result.chunks[0] + assert result.shards[0] == expected_shard_size + assert result.shards[0] >= 500_000 # Shard is big enough for array + assert result.shards[0] % result.chunks[0] == 0 # Properly divisible + class TestExternalAerodynamicsSTLTransformation: """Test the ExternalAerodynamicsSTLTransformation class."""