Comprehensive test suite for the dlt-iceberg destination covering SQL catalogs (SQLite) and REST catalogs (Nessie).
# Run all tests (except integration tests requiring Docker)
uv run pytest tests/ -m "not integration" -v
# Run everything including integration tests
docker compose up -d
uv run pytest tests/ -vSchema Conversion - tests/test_schema_converter.py
Tests dlt → Iceberg schema conversion for all data types.
uv run pytest tests/test_schema_converter.py -vPartition Building - tests/test_partition_builder.py
Tests partition spec generation from dlt hints.
uv run pytest tests/test_partition_builder.py -vSchema Casting - tests/test_schema_casting.py
Tests safe type casting with data loss detection.
uv run pytest tests/test_schema_casting.py -vError Handling - tests/test_error_handling.py
Tests retry logic and error categorization.
uv run pytest tests/test_error_handling.py -vBasic E2E - tests/test_destination_e2e.py
Full pipeline test using SQLite catalog (no Docker).
uv run pytest tests/test_destination_e2e.py -vAtomic Commits - tests/test_class_based_atomic.py
Tests that multiple files are committed atomically in a single Iceberg snapshot.
uv run pytest tests/test_class_based_atomic.py -vMerge Disposition - tests/test_merge_disposition.py
Tests upsert logic with primary keys.
uv run pytest tests/test_merge_disposition.py -vSchema Evolution - tests/test_schema_evolution.py
Tests adding columns and type promotions.
uv run pytest tests/test_schema_evolution.py -vREST Catalog - tests/test_destination_rest_catalog.py
Tests against Nessie REST catalog with MinIO S3 storage. VERIFIED ROBUST.
Prerequisites:
# Start services
docker compose up -d
# Wait for Nessie to be ready (~30 seconds)
docker compose ps
# Verify Nessie
curl http://localhost:19120/api/v2/configRun:
uv run pytest tests/test_destination_rest_catalog.py -v -sServices:
- Nessie (REST catalog): http://localhost:19120
- MinIO (S3 storage): http://localhost:9000
- MinIO Console: http://localhost:9001
# Run only integration tests (require Docker)
uv run pytest -m integration -v
# Skip integration tests
uv run pytest -m "not integration" -v✓ Schema conversion (dlt → Iceberg, all types) ✓ Partition building (temporal and identity transforms) ✓ Type support (primitives, lists, structs, nested) ✓ Write dispositions (append, replace, merge) ✓ Atomic commits (multiple files → single snapshot) ✓ Schema evolution (add columns, type promotions) ✓ Error handling and retries ✓ OAuth2 authentication (for REST catalogs) ✓ REST catalog integration (Nessie + MinIO)
The destination has two implementations:
- Function-based (
destination.py) - Legacy, commits per file - Class-based (
destination_client.py) - Current, atomic multi-file commits
Both are tested. The class-based version is the recommended default.
# All unit tests
uv run pytest tests/test_schema_converter.py tests/test_partition_builder.py tests/test_schema_casting.py -v
# All E2E tests (SQLite)
uv run pytest tests/test_destination_e2e.py tests/test_class_based_atomic.py tests/test_merge_disposition.py -v
# Only integration tests
uv run pytest -m integration -v
# Specific test function
uv run pytest tests/test_class_based_atomic.py::test_class_based_atomic_commits -vimport dlt
from datetime import datetime
from dlt_iceberg import iceberg_rest
@dlt.resource(name="test_events", write_disposition="append")
def events():
yield {"id": 1, "timestamp": datetime.now(), "value": 100}
pipeline = dlt.pipeline(
pipeline_name="test",
destination=iceberg_rest(
catalog_uri="sqlite:////tmp/catalog.db",
warehouse="file:///tmp/warehouse",
namespace="test",
),
)
info = pipeline.run(events())
print(info)import dlt
from datetime import datetime
from dlt_iceberg import iceberg_rest
@dlt.resource(name="test_events", write_disposition="append")
def events():
yield {"id": 1, "timestamp": datetime.now(), "value": 100}
pipeline = dlt.pipeline(
pipeline_name="test",
destination=iceberg_rest(
catalog_uri="http://localhost:19120/iceberg/main",
warehouse="s3://warehouse", # MinIO bucket
namespace="test",
s3_endpoint="http://localhost:9000",
s3_access_key_id="minioadmin",
s3_secret_access_key="minioadmin",
),
)
info = pipeline.run(events())
print(info)from pyiceberg.catalog import load_catalog
# SQLite catalog
catalog = load_catalog(
"test",
type="sql",
uri="sqlite:////tmp/catalog.db",
warehouse="file:///tmp/warehouse",
)
# Nessie catalog
catalog = load_catalog(
"test",
type="rest",
uri="http://localhost:19120/iceberg/main",
s3.endpoint="http://localhost:9000",
s3.access-key-id="minioadmin",
s3.secret-access-key="minioadmin",
)
table = catalog.load_table("test.test_events")
df = table.scan().to_arrow().to_pandas()
print(df)For CI/CD:
# Run only unit tests (no Docker)
uv run pytest tests/ -m "not integration" -v
# Run with Docker services
docker compose up -d
uv run pytest tests/ -v
docker compose downImport Errors:
uv syncIntegration Test Failures:
- Check services:
docker compose ps - Verify Nessie:
curl http://localhost:19120/api/v2/config - Check MinIO:
curl http://localhost:9000/minio/health/live - View logs:
docker compose logs nessie
Slow Tests:
# Run specific test
uv run pytest tests/test_schema_converter.py::test_convert_arrow_to_iceberg_basic_types -v
# Skip slow integration tests
uv run pytest -m "not integration" -vTest Isolation:
All E2E tests create temporary directories and SQLite databases to avoid conflicts.
The REST catalog test (test_destination_rest_catalog.py) is designed for reliability:
- Cleanup before each run (drops existing table)
- Health checks (skips if Nessie unavailable)
- Proper error handling
- No race conditions
- Idempotent (can run multiple times)
- Verified: 6+ consecutive runs without failure