diff --git a/src/dlt_iceberg/schema_casting.py b/src/dlt_iceberg/schema_casting.py index c004fe8..1c166fd 100644 --- a/src/dlt_iceberg/schema_casting.py +++ b/src/dlt_iceberg/schema_casting.py @@ -436,6 +436,17 @@ def cast_table_safe( f"to schema with {len(target_schema)} fields" ) + # Add null columns for any field in the target schema missing from the source. + # This handles sparse incoming data where the Iceberg table has columns that + # the current batch doesn't contain. + source_field_names = {field.name for field in table.schema} + for target_field in target_schema: + if target_field.name not in source_field_names: + table = table.append_column( + target_field, + pa.nulls(len(table), type=target_field.type), + ) + # Reorder columns to match target schema before casting # PyArrow's cast() requires fields to be in the same order target_field_names = [field.name for field in target_schema] diff --git a/src/dlt_iceberg/schema_evolution.py b/src/dlt_iceberg/schema_evolution.py index 2b37fb5..4929ef1 100644 --- a/src/dlt_iceberg/schema_evolution.py +++ b/src/dlt_iceberg/schema_evolution.py @@ -144,13 +144,6 @@ def validate_schema_changes( """ errors = [] - # Check dropped columns - if dropped_fields and not allow_column_drops: - errors.append( - f"Columns dropped (not safe): {', '.join(dropped_fields)}. " - f"Dropping columns is not supported by default to prevent data loss." - ) - # Check type changes for field_name, old_type, new_type in type_changes: if not can_promote_type(old_type, new_type): @@ -169,7 +162,8 @@ def validate_schema_changes( def apply_schema_evolution( table, added_fields: List[NestedField], - type_changes: List[Tuple[str, IcebergType, IcebergType]] + type_changes: List[Tuple[str, IcebergType, IcebergType]], + dropped_fields: Optional[List[str]] = None, ) -> None: """ Apply schema evolution changes to an Iceberg table. @@ -178,14 +172,16 @@ def apply_schema_evolution( table: PyIceberg table instance added_fields: New fields to add type_changes: Type promotions to apply + dropped_fields: Fields to remove from the schema """ - if not added_fields and not type_changes: + if not added_fields and not type_changes and not dropped_fields: logger.info("No schema changes to apply") return logger.info( f"Applying schema evolution: " - f"{len(added_fields)} new columns, {len(type_changes)} type promotions" + f"{len(added_fields)} new columns, {len(type_changes)} type promotions, " + f"{len(dropped_fields or [])} dropped columns" ) # Apply changes using update_schema transaction @@ -208,6 +204,11 @@ def apply_schema_evolution( field_type=new_type ) + # Delete dropped columns + for field_name in (dropped_fields or []): + logger.info(f" Dropping column: {field_name}") + update.delete_column(field_name) + logger.info("Schema evolution applied successfully") @@ -245,7 +246,13 @@ def evolve_schema_if_needed( if type_changes: logger.info(f"Detected {len(type_changes)} type changes: {[(name, str(old), str(new)) for name, old, new in type_changes]}") if dropped_fields: - logger.warning(f"Detected {len(dropped_fields)} dropped columns: {dropped_fields}") + if allow_column_drops: + logger.info(f"Detected {len(dropped_fields)} columns to drop: {dropped_fields}") + else: + logger.warning( + f"Detected {len(dropped_fields)} sparse columns (not in incoming data): " + f"{dropped_fields}. Columns will remain in schema; new rows will have nulls." + ) # If no changes, nothing to do if not added_fields and not type_changes and not dropped_fields: @@ -255,7 +262,15 @@ def evolve_schema_if_needed( # Validate changes are safe validate_schema_changes(added_fields, type_changes, dropped_fields, allow_column_drops) - # Apply evolution - apply_schema_evolution(table, added_fields, type_changes) + # When allow_column_drops=False and only dropped fields were detected, + # the table schema is already correct — no evolution needed. + if not allow_column_drops and not added_fields and not type_changes: + return False + + # Apply evolution, passing dropped_fields only when allow_column_drops=True + apply_schema_evolution( + table, added_fields, type_changes, + dropped_fields=dropped_fields if allow_column_drops else None, + ) return True diff --git a/tests/test_schema_evolution.py b/tests/test_schema_evolution.py index 873143f..da3db23 100644 --- a/tests/test_schema_evolution.py +++ b/tests/test_schema_evolution.py @@ -280,13 +280,9 @@ def test_schema_evolution_unsafe_changes(): assert "value" in dropped, "Should detect 'value' was dropped" print(f" Detected dropped columns: {dropped}") - # Should raise error by default - try: - validate_schema_changes(added, type_changes, dropped, allow_column_drops=False) - assert False, "Should have raised SchemaEvolutionError for dropped column" - except SchemaEvolutionError as e: - assert "value" in str(e).lower() or "dropped" in str(e).lower() - print(f" Correctly rejected: {str(e)}") + # Dropped columns should not raise - sparse data is handled by filling nulls + validate_schema_changes(added, type_changes, dropped, allow_column_drops=False) + print(f" Sparse columns accepted (nulls will be filled at write time)") # Test 2: Unsafe type narrowing should be detected print("\nTest 2: Unsafe type narrowing detection") diff --git a/tests/test_sparse_schema.py b/tests/test_sparse_schema.py new file mode 100644 index 0000000..854ee53 --- /dev/null +++ b/tests/test_sparse_schema.py @@ -0,0 +1,277 @@ +""" +Tests for sparse schema support and allow_column_drops implementation. + +Covers three behaviors: +1. Sparse incoming data (fewer columns than existing schema) should not raise + SchemaEvolutionError. Columns remain in the table; new rows get nulls. +2. cast_table_safe should fill missing columns with nulls before casting. +3. allow_column_drops=True should remove columns from the Iceberg schema via + apply_schema_evolution. +""" + +import pytest +import tempfile +import shutil +import pyarrow as pa +from pyiceberg.schema import Schema +from pyiceberg.types import ( + NestedField, + LongType, + StringType, + DoubleType, +) + +from dlt_iceberg.schema_evolution import ( + compare_schemas, + validate_schema_changes, + apply_schema_evolution, + evolve_schema_if_needed, + SchemaEvolutionError, +) +from dlt_iceberg.schema_casting import cast_table_safe, CastingError + + +# --------------------------------------------------------------------------- +# Bug 1: Sparse data should not raise SchemaEvolutionError +# --------------------------------------------------------------------------- + + +def test_validate_sparse_data_allow_column_drops_false(): + """Sparse incoming data with allow_column_drops=False leaves columns + in the schema and fills nulls at write time.""" + added = [] + type_changes = [] + dropped = ["extra_col_a", "extra_col_b"] + + validate_schema_changes( + added, type_changes, dropped, allow_column_drops=False + ) + + +def test_validate_sparse_data_allow_column_drops_true(): + """allow_column_drops=True does not raise; columns are removed via + apply_schema_evolution instead.""" + added = [] + type_changes = [] + dropped = ["extra_col_a"] + + validate_schema_changes( + added, type_changes, dropped, allow_column_drops=True + ) + + +def test_evolve_schema_if_needed_sparse_data(): + """evolve_schema_if_needed with sparse data and allow_column_drops=False + preserves the full table schema without error.""" + temp_dir = tempfile.mkdtemp() + + print(f"\nTest: Sparse data - evolve_schema_if_needed") + print(f" Temp dir: {temp_dir}") + + try: + from pyiceberg.catalog import load_catalog + + catalog = load_catalog( + "test", + type="sql", + uri=f"sqlite:///{temp_dir}/catalog.db", + warehouse=f"file://{temp_dir}/warehouse", + ) + catalog.create_namespace("ns") + + wide_schema = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "name", StringType(), required=False), + NestedField(3, "score", DoubleType(), required=False), + ) + table = catalog.create_table("ns.wide_table", schema=wide_schema) + print(f" Created table with schema: {[f.name for f in wide_schema.fields]}") + + narrow_schema = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "name", StringType(), required=False), + ) + print(f" Incoming schema (sparse): {[f.name for f in narrow_schema.fields]}") + + result = evolve_schema_if_needed( + table, narrow_schema, allow_column_drops=False + ) + + refreshed = catalog.load_table("ns.wide_table") + field_names = [f.name for f in refreshed.schema().fields] + assert "score" in field_names, ( + "score column should remain in schema for sparse data" + ) + print(f" Table schema preserved: {field_names}") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + +# --------------------------------------------------------------------------- +# Bug 2: cast_table_safe should fill missing columns with nulls +# --------------------------------------------------------------------------- + + +def test_cast_missing_column_filled_with_nulls(): + """A column present in the target but absent in the source is filled + with nulls.""" + source_schema = pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ]) + target_schema = pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + pa.field("score", pa.float64()), + ]) + + table = pa.table( + {"id": [1, 2, 3], "name": ["a", "b", "c"]}, + schema=source_schema, + ) + + result = cast_table_safe(table, target_schema, strict=False) + + assert len(result) == 3 + assert result.schema == target_schema + assert result.column("score").to_pylist() == [None, None, None] + + +def test_cast_multiple_missing_columns_filled(): + """Multiple missing columns are all filled with nulls.""" + source_schema = pa.schema([ + pa.field("id", pa.int64()), + ]) + target_schema = pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + pa.field("score", pa.float64()), + pa.field("active", pa.bool_()), + ]) + + table = pa.table({"id": [1, 2]}, schema=source_schema) + + result = cast_table_safe(table, target_schema, strict=False) + + assert len(result) == 2 + assert result.schema == target_schema + assert result.column("name").to_pylist() == [None, None] + assert result.column("score").to_pylist() == [None, None] + assert result.column("active").to_pylist() == [None, None] + + +def test_cast_missing_columns_work_in_strict_mode(): + """Missing columns are warnings, not errors, so strict mode succeeds.""" + source_schema = pa.schema([ + pa.field("id", pa.int64()), + ]) + target_schema = pa.schema([ + pa.field("id", pa.int64()), + pa.field("value", pa.int64()), + ]) + + table = pa.table({"id": [10]}, schema=source_schema) + + result = cast_table_safe(table, target_schema, strict=True) + + assert len(result) == 1 + assert result.schema == target_schema + assert result.column("value").to_pylist() == [None] + + +# --------------------------------------------------------------------------- +# Bug 3: allow_column_drops=True should actually remove columns +# --------------------------------------------------------------------------- + + +def test_apply_schema_evolution_deletes_columns(): + """apply_schema_evolution removes columns listed in dropped_fields.""" + temp_dir = tempfile.mkdtemp() + + print(f"\nTest: apply_schema_evolution with dropped_fields") + print(f" Temp dir: {temp_dir}") + + try: + from pyiceberg.catalog import load_catalog + + catalog = load_catalog( + "test", + type="sql", + uri=f"sqlite:///{temp_dir}/catalog.db", + warehouse=f"file://{temp_dir}/warehouse", + ) + catalog.create_namespace("ns") + + schema = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "name", StringType(), required=False), + NestedField(3, "obsolete", StringType(), required=False), + ) + table = catalog.create_table("ns.drop_test", schema=schema) + print(f" Created table with schema: {[f.name for f in schema.fields]}") + + apply_schema_evolution( + table, + added_fields=[], + type_changes=[], + dropped_fields=["obsolete"], + ) + + refreshed = catalog.load_table("ns.drop_test") + field_names = [f.name for f in refreshed.schema().fields] + assert "obsolete" not in field_names + assert "id" in field_names + assert "name" in field_names + print(f" Schema after drop: {field_names}") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + +def test_evolve_schema_drops_columns_when_allowed(): + """evolve_schema_if_needed with allow_column_drops=True removes columns + missing from the incoming schema.""" + temp_dir = tempfile.mkdtemp() + + print(f"\nTest: evolve_schema_if_needed with allow_column_drops=True") + print(f" Temp dir: {temp_dir}") + + try: + from pyiceberg.catalog import load_catalog + + catalog = load_catalog( + "test", + type="sql", + uri=f"sqlite:///{temp_dir}/catalog.db", + warehouse=f"file://{temp_dir}/warehouse", + ) + catalog.create_namespace("ns") + + schema = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "name", StringType(), required=False), + NestedField(3, "to_drop", StringType(), required=False), + ) + table = catalog.create_table("ns.evolve_drop", schema=schema) + print(f" Created table with schema: {[f.name for f in schema.fields]}") + + narrow_schema = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "name", StringType(), required=False), + ) + print(f" Incoming schema: {[f.name for f in narrow_schema.fields]}") + + evolved = evolve_schema_if_needed( + table, narrow_schema, allow_column_drops=True + ) + assert evolved, "Schema should have evolved (column dropped)" + + refreshed = catalog.load_table("ns.evolve_drop") + field_names = [f.name for f in refreshed.schema().fields] + assert "to_drop" not in field_names + print(f" Schema after evolution: {field_names}") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"])