Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/dlt_iceberg/schema_casting.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,17 @@ def cast_table_safe(
f"to schema with {len(target_schema)} fields"
)

# Add null columns for any field in the target schema missing from the source.
# This handles sparse incoming data where the Iceberg table has columns that
# the current batch doesn't contain.
source_field_names = {field.name for field in table.schema}
for target_field in target_schema:
if target_field.name not in source_field_names:
table = table.append_column(
target_field,
pa.nulls(len(table), type=target_field.type),
)

# Reorder columns to match target schema before casting
# PyArrow's cast() requires fields to be in the same order
target_field_names = [field.name for field in target_schema]
Expand Down
41 changes: 28 additions & 13 deletions src/dlt_iceberg/schema_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,6 @@ def validate_schema_changes(
"""
errors = []

# Check dropped columns
if dropped_fields and not allow_column_drops:
errors.append(
f"Columns dropped (not safe): {', '.join(dropped_fields)}. "
f"Dropping columns is not supported by default to prevent data loss."
)

# Check type changes
for field_name, old_type, new_type in type_changes:
if not can_promote_type(old_type, new_type):
Expand All @@ -169,7 +162,8 @@ def validate_schema_changes(
def apply_schema_evolution(
table,
added_fields: List[NestedField],
type_changes: List[Tuple[str, IcebergType, IcebergType]]
type_changes: List[Tuple[str, IcebergType, IcebergType]],
dropped_fields: Optional[List[str]] = None,
) -> None:
"""
Apply schema evolution changes to an Iceberg table.
Expand All @@ -178,14 +172,16 @@ def apply_schema_evolution(
table: PyIceberg table instance
added_fields: New fields to add
type_changes: Type promotions to apply
dropped_fields: Fields to remove from the schema
"""
if not added_fields and not type_changes:
if not added_fields and not type_changes and not dropped_fields:
logger.info("No schema changes to apply")
return

logger.info(
f"Applying schema evolution: "
f"{len(added_fields)} new columns, {len(type_changes)} type promotions"
f"{len(added_fields)} new columns, {len(type_changes)} type promotions, "
f"{len(dropped_fields or [])} dropped columns"
)

# Apply changes using update_schema transaction
Expand All @@ -208,6 +204,11 @@ def apply_schema_evolution(
field_type=new_type
)

# Delete dropped columns
for field_name in (dropped_fields or []):
logger.info(f" Dropping column: {field_name}")
update.delete_column(field_name)

logger.info("Schema evolution applied successfully")


Expand Down Expand Up @@ -245,7 +246,13 @@ def evolve_schema_if_needed(
if type_changes:
logger.info(f"Detected {len(type_changes)} type changes: {[(name, str(old), str(new)) for name, old, new in type_changes]}")
if dropped_fields:
logger.warning(f"Detected {len(dropped_fields)} dropped columns: {dropped_fields}")
if allow_column_drops:
logger.info(f"Detected {len(dropped_fields)} columns to drop: {dropped_fields}")
else:
logger.warning(
f"Detected {len(dropped_fields)} sparse columns (not in incoming data): "
f"{dropped_fields}. Columns will remain in schema; new rows will have nulls."
)

# If no changes, nothing to do
if not added_fields and not type_changes and not dropped_fields:
Expand All @@ -255,7 +262,15 @@ def evolve_schema_if_needed(
# Validate changes are safe
validate_schema_changes(added_fields, type_changes, dropped_fields, allow_column_drops)

# Apply evolution
apply_schema_evolution(table, added_fields, type_changes)
# When allow_column_drops=False and only dropped fields were detected,
# the table schema is already correct — no evolution needed.
if not allow_column_drops and not added_fields and not type_changes:
return False

# Apply evolution, passing dropped_fields only when allow_column_drops=True
apply_schema_evolution(
table, added_fields, type_changes,
dropped_fields=dropped_fields if allow_column_drops else None,
)

return True
10 changes: 3 additions & 7 deletions tests/test_schema_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,9 @@ def test_schema_evolution_unsafe_changes():
assert "value" in dropped, "Should detect 'value' was dropped"
print(f" Detected dropped columns: {dropped}")

# Should raise error by default
try:
validate_schema_changes(added, type_changes, dropped, allow_column_drops=False)
assert False, "Should have raised SchemaEvolutionError for dropped column"
except SchemaEvolutionError as e:
assert "value" in str(e).lower() or "dropped" in str(e).lower()
print(f" Correctly rejected: {str(e)}")
# Dropped columns should not raise - sparse data is handled by filling nulls
validate_schema_changes(added, type_changes, dropped, allow_column_drops=False)
print(f" Sparse columns accepted (nulls will be filled at write time)")

# Test 2: Unsafe type narrowing should be detected
print("\nTest 2: Unsafe type narrowing detection")
Expand Down
Loading
Loading