Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/external_aerodynamics/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def _write_zarr(
name=field,
data=array_info.data,
chunks=array_info.chunks,
shards=array_info.shards,
compressors=array_info.compressor if array_info.compressor else None,
)

Expand All @@ -264,6 +265,7 @@ def _write_zarr(
name=field,
data=array_info.data,
chunks=array_info.chunks,
shards=array_info.shards,
compressors=(
array_info.compressor if array_info.compressor else None
),
Expand Down
29 changes: 28 additions & 1 deletion examples/external_aerodynamics/data_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def __init__(
compression_method: str = "zstd",
compression_level: int = 5,
chunk_size_mb: float = 1.0, # Default 1MB chunk size
chunks_per_shard: int = 1000, # Number of chunks per shard (~1GB shards with 1MB chunks)
):
super().__init__(cfg)
# Zarr 3 codec configuration
Expand All @@ -238,6 +239,7 @@ def __init__(
shuffle=zarr.codecs.BloscShuffle.shuffle,
)
self.chunk_size_mb = chunk_size_mb
self.chunks_per_shard = chunks_per_shard

# Warn if chunk size might be problematic
if chunk_size_mb < 1.0:
Expand All @@ -254,7 +256,11 @@ def __init__(
)

def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo:
"""Prepare array for Zarr storage with compression and chunking."""
"""Prepare array for Zarr storage with compression, chunking, and sharding.

Sharding is always enabled to reduce the number of files created on disk.
Each shard contains approximately chunks_per_shard chunks (~1GB per shard by default).
"""
if array is None:
return None

Expand All @@ -266,17 +272,38 @@ def _prepare_array(self, array: np.ndarray) -> PreparedZarrArrayInfo:
if len(shape) == 1:
chunk_size = min(shape[0], target_chunk_size // item_size)
chunks = (chunk_size,)
# For 1D arrays: shard contains chunks_per_shard chunks
# Ensure shard size is a multiple of chunk size
ideal_shard_size = chunks[0] * self.chunks_per_shard
if shape[0] <= ideal_shard_size:
# Array fits in one shard - round up to nearest chunk multiple
num_chunks = (shape[0] + chunks[0] - 1) // chunks[0]
shard_size = num_chunks * chunks[0]
else:
shard_size = ideal_shard_size
shards = (shard_size,)
else:
# For 2D arrays, try to keep rows together
chunk_rows = min(
shape[0], max(1, target_chunk_size // (item_size * shape[1]))
)
chunks = (chunk_rows, shape[1])
# For 2D arrays: extend along first dimension to group multiple chunk-rows
# Ensure shard rows is a multiple of chunk rows
ideal_shard_rows = chunks[0] * self.chunks_per_shard
if shape[0] <= ideal_shard_rows:
# Array fits in one shard - round up to nearest chunk multiple
num_chunks = (shape[0] + chunks[0] - 1) // chunks[0]
shard_rows = num_chunks * chunks[0]
else:
shard_rows = ideal_shard_rows
shards = (shard_rows, shape[1])

return PreparedZarrArrayInfo(
data=np.float32(array),
chunks=chunks,
compressor=self.compressor,
shards=shards,
)

def transform(
Expand Down
1 change: 1 addition & 0 deletions examples/external_aerodynamics/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class PreparedZarrArrayInfo:
data: np.ndarray
chunks: tuple[int, ...]
compressor: zarr.abc.codec
shards: Optional[tuple[int, ...]] = None


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,188 @@ def test_chunk_size_effect(self, sample_data_processed):
large_data_num_chunks = np.ceil(100000 / large_data_elements_per_chunk)
assert large_data_num_chunks == 1

def test_sharding_always_enabled(self, sample_data_processed):
"""Test that sharding is always enabled for all arrays."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(
config,
chunk_size_mb=1.0,
chunks_per_shard=1000,
)

result = transform.transform(sample_data_processed)

# All arrays should have sharding enabled
assert result.stl_coordinates.shards is not None
assert result.stl_centers.shards is not None
assert result.stl_faces.shards is not None
assert result.stl_areas.shards is not None
assert result.surface_mesh_centers.shards is not None
assert result.surface_normals.shards is not None
assert result.surface_areas.shards is not None
assert result.surface_fields.shards is not None
assert result.volume_mesh_centers.shards is not None
assert result.volume_fields.shards is not None

def test_sharding_shape_for_large_arrays(self):
"""Test that sharding shape is calculated correctly for large arrays."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(
config,
chunk_size_mb=1.0,
chunks_per_shard=1000,
)

# Create a large array > 1GB
# For float32, 1 GB = 1024^3 / 4 = 268,435,456 elements
# Create a 2D array: 300M elements / 3 columns = 100M rows
# This is 300M * 4 bytes = 1.2 GB
large_array = np.random.rand(100_000_000, 3).astype(np.float64)

metadata = ExternalAerodynamicsMetadata(
filename="test_large",
dataset_type=ModelType.COMBINED,
stream_velocity=30.0,
air_density=1.205,
)

large_data = ExternalAerodynamicsExtractedDataInMemory(
metadata=metadata,
stl_coordinates=np.array([[0, 0, 0], [1, 1, 1]], dtype=np.float64),
stl_centers=np.array([[0.5, 0.5, 0.5]], dtype=np.float64),
stl_faces=np.array([[0, 1, 2]], dtype=np.int32),
stl_areas=np.array([1.0], dtype=np.float64),
volume_fields=large_array, # Large array
)

result = transform.transform(large_data)

# All arrays should have sharding enabled
assert result.stl_coordinates.shards is not None
assert result.stl_centers.shards is not None

# Check large array shard shape
assert result.volume_fields.shards is not None
assert len(result.volume_fields.shards) == 2 # 2D array
assert (
result.volume_fields.shards[1] == 3
) # Second dimension should match array
# First dimension should be chunk_rows * chunks_per_shard (or array size if smaller)
expected_shard_rows = min(100_000_000, result.volume_fields.chunks[0] * 1000)
assert result.volume_fields.shards[0] == expected_shard_rows

def test_sharding_calculation_1d_array(self):
"""Test correct shard shape calculation for 1D arrays."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(
config,
chunk_size_mb=1.0,
chunks_per_shard=100, # Use smaller value for easier testing
)

# Create 1D array (400MB as float64, 200MB as float32)
# 50M elements * 8 bytes = 400 MB
large_1d_array = np.random.rand(50_000_000).astype(np.float64)

result = transform._prepare_array(large_1d_array)

# Check that sharding is enabled
assert result.shards is not None
assert len(result.shards) == 1

# For 1D array: shard_size = chunks[0] * chunks_per_shard
# With 1MB chunks and float32 (4 bytes): chunk_size = 1MB / 4 = 262144 elements
# So shards should be ~26,214,400 elements (100 chunks)
expected_shard_size = result.chunks[0] * 100
assert result.shards[0] == expected_shard_size

def test_sharding_calculation_2d_array(self):
"""Test correct shard shape calculation for 2D arrays."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(
config,
chunk_size_mb=1.0,
chunks_per_shard=50, # Use smaller value for easier testing
)

# Create 2D array (2.4GB as float64, 1.2GB as float32)
# 100M rows * 3 cols * 8 bytes = 2.4 GB
large_2d_array = np.random.rand(100_000_000, 3).astype(np.float64)

result = transform._prepare_array(large_2d_array)

# Check that sharding is enabled
assert result.shards is not None
assert len(result.shards) == 2

# For 2D arrays: shard_rows = chunk_rows * chunks_per_shard
# Second dimension should match array shape
expected_shard_rows = result.chunks[0] * 50
assert result.shards[0] == expected_shard_rows
assert result.shards[1] == 3 # Matches second dimension

def test_sharding_initialization_defaults(self):
"""Test that default sharding parameters are set correctly."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(config)

# Check defaults match Zarr recommendations
assert transform.chunks_per_shard == 1000 # Zarr recommended (~1GB shards)
assert transform.chunk_size_mb == 1.0

def test_sharding_divisibility_requirement(self):
"""Test that shard shapes are always divisible by chunk shapes."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(
config,
chunk_size_mb=1.0,
chunks_per_shard=1000,
)

# Test with various array sizes that might not divide evenly
test_cases = [
(1234, 3), # Odd number of rows
(999, 5), # Another odd number
(1000000, 4), # Large array
(50, 3), # Very small array
(262144 + 100, 3), # Just over one chunk
]

for rows, cols in test_cases:
array = np.random.rand(rows, cols).astype(np.float64)
result = transform._prepare_array(array)

# Check that shards are divisible by chunks in each dimension
assert result.shards[0] % result.chunks[0] == 0, (
f"Shard rows {result.shards[0]} not divisible by chunk rows {result.chunks[0]} "
f"for array shape ({rows}, {cols})"
)
assert (
result.shards[1] == result.chunks[1]
), f"Shard cols {result.shards[1]} should equal chunk cols {result.chunks[1]}"

def test_sharding_small_array_one_shard(self):
"""Test that small arrays fit into a single shard with proper rounding."""
config = ProcessingConfig(num_processes=1)
transform = ExternalAerodynamicsZarrTransformation(
config,
chunk_size_mb=1.0,
chunks_per_shard=1000,
)

# Create small array that would need multiple chunks but < chunks_per_shard
# With 1MB chunks and float64: ~262144 elements per chunk for 1D
# Create array with 500,000 elements (needs ~2 chunks)
small_array = np.random.rand(500_000).astype(np.float64)
result = transform._prepare_array(small_array)

# Should have 1 shard that contains all chunks
num_chunks_needed = (500_000 + result.chunks[0] - 1) // result.chunks[0]
expected_shard_size = num_chunks_needed * result.chunks[0]
assert result.shards[0] == expected_shard_size
assert result.shards[0] >= 500_000 # Shard is big enough for array
assert result.shards[0] % result.chunks[0] == 0 # Properly divisible


class TestExternalAerodynamicsSTLTransformation:
"""Test the ExternalAerodynamicsSTLTransformation class."""
Expand Down