Skip to content

feat(iceberg): stream Iceberg writes with constant memory#3753

Open
ajinzrathod wants to merge 4 commits intodlt-hub:develfrom
ajinzrathod:feat/3752-iceberg-streaming-atomic-commit
Open

feat(iceberg): stream Iceberg writes with constant memory#3753
ajinzrathod wants to merge 4 commits intodlt-hub:develfrom
ajinzrathod:feat/3752-iceberg-streaming-atomic-commit

Conversation

@ajinzrathod
Copy link
Copy Markdown

Description

Closes #3752

Previously, Iceberg loads materialised the entire Arrow dataset in memory (self.arrow_dataset.to_table()) and upserts created one snapshot per batch. This caused OOM risk on large datasets and snapshot bloat that slowed metadata operations.

This PR streams all Iceberg write paths batch-by-batch with constant memory and produces minimal Iceberg snapshots — bringing Iceberg to parity with the Delta Lake code path which already uses RecordBatchReader.

Changes

dlt/common/libs/pyiceberg.py

  • write_iceberg_table now accepts Union[pa.Table, pa.RecordBatchReader]. When given a RecordBatchReader, it dispatches to the new _write_iceberg_table_streamed function.
  • _write_iceberg_table_streamed (new): Streams Arrow batches one at a time — each batch is written to a temp parquet file, uploaded to remote storage via Iceberg's IO, then discarded. All files are registered atomically with table.add_files() (append) or delete + add_files in a single transaction (replace). Memory stays constant: only one batch + one parquet file in memory at a time.
  • merge_iceberg_table now accepts Union[pa.Table, pa.RecordBatchReader] and delegates to the new _upsert_iceberg_table function.
  • _upsert_iceberg_table (new): Streams batches within a single table.transaction(). Updates go via txn.overwrite() only when rows actually changed (and only for upsert strategy, not insert-only). Inserts are collected as remote parquet files and registered via txn.add_files() at the end of the transaction. For pure-insert loads (e.g. first load into an empty table) this produces exactly one Iceberg snapshot.

dlt/destinations/impl/filesystem/filesystem.py

  • IcebergLoadFilesystemJob.run(): Reads schema cheaply from the first parquet file header (pq.read_schema) instead of materialising the full dataset. For merge path: streams via scanner(batch_readahead=0, fragment_readahead=0, use_threads=False).to_reader(). For append/replace: uses _iter_parquet_batches generator wrapped in RecordBatchReader.from_batches(). Never calls .to_table() on the full dataset. Explicit gc.collect() after loads and periodically during batching.
  • _iter_parquet_batches (new static method): Yields Arrow batches from parquet files one at a time for constant memory.

dlt/destinations/impl/filesystem/factory.py

  • Set recommended_file_size = 128MB for the filesystem destination so upstream produces reasonably-sized parquet files for streaming.

Key invariants

Related upstream issues

This addresses a known limitation in pyiceberg itself:

Testing

Tested locally with Docker (MinIO + Iceberg REST catalog, 4GB memory limit) across varying dataset sizes (up to 100 parquet files, 1M+ rows × 400 columns). Behavior is consistent at all scales:

Scenario Result
Replace Constant memory, 1 atomic commit (DELETE + APPEND)
Upsert 1 OVERWRITE + 1 APPEND within a single transaction, total row count preserved
Append Constant memory, 1 APPEND snapshot

A ready-to-use test harness with Docker Compose (MinIO + Iceberg REST catalog), data generators, and test scripts is available on the github.com/ajinzrathod/dlt-ajinzrathod's(me) feat/iceberg-streaming-testing branch if you'd like to reproduce locally.

Copy link
Copy Markdown
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really cool! and I think solves a lot of practical problems before pyiceberg is fixed. two things before we can merge this

  • some of the methods do garbage collection. could you parametrize how often it is done (which will also allow to disable it)?
  • this PR does not include tests. if you are using claude code (and probably any other decent agent) you should be able to ask it to add tests in the correct place and with compatible parametrization
  • we do not need to test on bucket. here IMO tests on local filesystem are sufficient
    ie. test_open_table_pipeline module is a good start and
@pytest.mark.parametrize(
    "destination_config",
    destinations_configs(
        table_format_local_configs=True,
        with_table_format="iceberg",
    ),
    ids=lambda x: x.name,
)
def test_iceberg_table_properties(
    destination_config: DestinationTestConfiguration,
) -> None:
``` is good example parametrization

@ajinzrathod
Copy link
Copy Markdown
Author

Updated with both changes:

GC parametrization: Added iceberg_gc_collect_interval config option (default 0 = disabled). Any positive integer triggers gc.collect() every N batches. Threaded through from FilesystemDestinationClientConfiguration -> IcebergLoadFilesystemJob -> streaming functions. Progress logging is decoupled from GC and always runs every 10 batches.

Tests: Added 3 tests in test_open_table_pipeline.py using table_format_local_configs=True with with_table_format="iceberg":

  • test_iceberg_streamed_append_replace: parametrized for append and replace
  • test_iceberg_streamed_upsert: verifies updates and inserts with primary key

All passing locally on filesystem. Let me know if this looks good or if any other changes are needed.

@ajinzrathod ajinzrathod requested a review from rudolfix March 20, 2026 10:39
- Add store_decimal_as_integer=True to manual pq.write_table() calls
  to fix FIXED_LEN_BYTE_ARRAY vs INT64 mismatch
- Use txn.append() instead of add_files for partitioned tables since
  batches can span multiple partition values
- Separate update and insert transactions for partitioned upserts to
  work around pyiceberg 0.9.x SIGILL crash when mixing overwrite +
  append in one transaction
- Extract _upload_parquet_to_remote and _process_upsert_batch helpers
@ajinzrathod
Copy link
Copy Markdown
Author

I was looking at the failed test cases on CI, all the filesystem-parquet-iceberg tests were failing with:

Unexpected physical type FIXED_LEN_BYTE_ARRAY for decimal(10, 3), expected INT64

1. Decimal encoding mismatch

PyArrow defaults to encoding decimals as FIXED_LEN_BYTE_ARRAY, but Iceberg expects INT64 for small-precision decimals. The new manual pq.write_table() calls in the streaming path were missing the flag to force integer encoding.

Fix: Included store_decimal_as_integer=True in all pq.write_table() calls.

2. Partitioned tables failing on streamed writes

While fixing the above, I noticed the streaming path used table.add_files() for all tables. This breaks for partitioned tables, add_files requires each file to contain data for exactly one partition value, but our batches can span multiple partitions.

Fix: Split into two strategies:

  • Unpartitioned: manual parquet write + add_files (single atomic commit)
  • Partitioned: txn.append() per batch, letting pyiceberg handle partition-aware file layout

3. SIGILL crash during upsert on partitioned tables

After fixing the decimal encoding, I re-ran the tests locally. The tests that were failing earlier now got past the load step but the upsert test (test_merge_on_keys_in_schema_nested_hints) crashed with SIGILL (exit code 132). This was masked by Bug 1. The root cause is a pyiceberg 0.9.x bug: mixing txn.overwrite() + txn.append() in the same transaction triggers a native code crash.

Fix: Separated updates (overwrite) and inserts (append) into two transactions for partitioned tables. This trades full atomicity for not crashing, acceptable since dlt pipelines are idempotent and a re-run recovers any missing inserts. Can be collapsed back into one transaction once pyiceberg fixes this upstream.

Cleanup

  • Extracted _upload_parquet_to_remote() to deduplicate the temp-file-write-and-upload logic shared by streamed writes and upserts
  • Extracted _process_upsert_batch() to flatten the deeply nested upsert loop

@ajinzrathod ajinzrathod changed the title feat(iceberg): stream Iceberg writes via single atomic commit with constant memory feat(iceberg): stream Iceberg writes with constant memory Mar 23, 2026
@rudolfix
Copy link
Copy Markdown
Collaborator

@ajinzrathod I reviewed the code again. the fundamental change - to stream data via batch reader - makes total sense. but it seems to me that if we bump pyiceberg to 0.10 we won't need most of the custom upsert code here. pyiceberg implementation significantly improved and is also exposed in a way that allows to stream several batches in one transaction. I include implementation plan in next comment - I think it explains best what I think we should do here... pls let me know if you want to go with it, It is not a prob for us to take over - the goal is pretty clear now and mostly a matter of executing the plan.

@rudolfix
Copy link
Copy Markdown
Collaborator

Review & Suggested Simplification

Thanks for the PR — it solves a real OOM problem and the streaming approach is the right direction. After investigation, we believe the implementation can be significantly simplified by bumping the pyiceberg minimum to >=0.10.0.

Context

IcebergLoadFilesystemJob calls self.arrow_dataset.to_table() which loads all parquet files into memory before writing to Iceberg. For large datasets this is fatal. The fix is: stream batches from the arrow dataset and pass them one at a time to pyiceberg's transaction API.

With pyiceberg >=0.10.0, txn.append() and txn.upsert() both write files to storage immediately (via _dataframe_to_data_files()write_file()pq.ParquetWriter) — only one batch + one parquet file in memory at a time, with lightweight DataFile metadata accumulating in the transaction. This means most of the manual parquet writing and internal API usage in this PR is unnecessary.

Key findings

txn.upsert() available in 0.10+: pyiceberg 0.10.0 moved upsert from Table to Transaction (PR #1817). The old code on devel already has the TODO: "replace the batching method with transaction with pyiceberg's release after 0.9.1". With txn.upsert(), the entire _process_upsert_batch reimplementation using internal APIs (upsert_util, expression_to_pyarrow, bind) becomes unnecessary.

txn.upsert() is better than the PR's reimplementation:

  • PR uses table.scan().to_arrow() (committed data only) — inserts from batch N are invisible to batch N+1, risking duplicate inserts across batches
  • txn.upsert() uses DataScan(table_metadata=self.table_metadata).to_arrow_batch_reader() — streams matched rows and sees uncommitted changes from previous batches within the same transaction
  • PR uses internal pyiceberg APIs; txn.upsert() is stable public API

SIGILL crash doesn't reproduce: We tested the single-transaction approach (mixing txn.overwrite() + txn.append()) on both pyiceberg 0.9.1 and 0.10.0 — all tests pass. The two-transaction workaround for partitioned tables is unnecessary.

store_decimal_as_integer is native in 0.10+: The monkey-patch (lines 47-58) can be removed.

Memory is constant: txn.append() and txn.upsert() write parquet files immediately to storage via pyiceberg's IO. The transaction only accumulates lightweight file metadata. Per-batch memory is bounded by batch size.

Proposed changes

1. Bump pyiceberg minimum to >=0.10.0

Enables txn.upsert() and native store_decimal_as_integer=True.

2. Rewrite write_iceberg_table — stream via txn.append()

Accept Union[pa.Table, pa.RecordBatchReader]. For RecordBatchReader, iterate batches in a single transaction:

def write_iceberg_table(table, data, write_disposition):
    if isinstance(data, pa.RecordBatchReader):
        with table.transaction() as txn:
            if write_disposition == "replace" and table.current_snapshot():
                txn.delete(delete_filter=AlwaysTrue())
            for batch in data:
                batch_tbl = ensure_iceberg_compatible_arrow_data(pa.Table.from_batches([batch]))
                txn.append(batch_tbl)
    else:
        # pa.Table path — backward compatible
        ...

Delete: _write_iceberg_table_streamed, _write_streamed_partitioned, _write_streamed_add_files, _upload_parquet_to_remote, _UPLOAD_CHUNK_SIZE.

3. Rewrite merge_iceberg_table — stream via txn.upsert()

Iterate batches, call txn.upsert() per batch in a single transaction:

batches = (
    data if isinstance(data, pa.RecordBatchReader)
    else data.to_batches(max_chunksize=1_000)
)
with table.transaction() as txn:
    for batch in batches:
        batch_tbl = ensure_iceberg_compatible_arrow_data(pa.Table.from_batches([batch]))
        txn.upsert(
            df=batch_tbl, join_cols=join_cols,
            when_matched_update_all=strategy == "upsert",
            when_not_matched_insert_all=True, case_sensitive=True,
        )

Delete: _upsert_iceberg_table, _process_upsert_batch.

4. Simplify IcebergLoadFilesystemJob.run() — stream from arrow dataset

Keep reading schema from parquet header. Use the same scanner approach for all paths:

source_ds = self.arrow_dataset
with source_ds.scanner(
    batch_readahead=0, fragment_readahead=0, use_threads=False
).to_reader() as arrow_rbr:
    if self._load_table["write_disposition"] == "merge" and table is not None:
        merge_iceberg_table(table=table, data=arrow_rbr, ...)
    else:
        write_iceberg_table(table=table, data=arrow_rbr, ...)
del source_ds

Delete: _iter_parquet_batches static method.

5. Remove store_decimal_as_integer monkey-patch

Delete the if pyiceberg_semver < Version("0.10.0"): block. Native in 0.10+.

6. Remove recommended_file_size = 128MB

This is a global behavioral change affecting all filesystem users (jsonl, parquet, delta, iceberg). It belongs in a separate PR if desired.

7. Remove iceberg_gc_collect_interval config

Not needed — pyiceberg handles file writing and memory management internally via txn.append()/txn.upsert(). The scanner with readahead=0 already ensures constant memory on the read side.

What gets preserved from the PR

  • Streaming from arrow dataset via RecordBatchReader instead of .to_table() (the core fix)
  • Schema from parquet header via pq.read_schema()
  • write_iceberg_table() and merge_iceberg_table() accepting Union[pa.Table, RecordBatchReader]
  • Tests (adjusted)

What gets deleted

Function/Config Reason
_upload_parquet_to_remote() pyiceberg writes files internally
_write_iceberg_table_streamed() Replaced by txn.append() loop
_write_streamed_partitioned() pyiceberg handles partitions
_write_streamed_add_files() pyiceberg handles file registration
_process_upsert_batch() Replaced by txn.upsert()
_upsert_iceberg_table() Replaced by txn.upsert() loop
_UPLOAD_CHUNK_SIZE No manual uploads
_iter_parquet_batches() Scanner does this
iceberg_gc_collect_interval Not needed with pyiceberg's writer
recommended_file_size = 128MB Unrelated global change
store_decimal_as_integer patch Native in 0.10+

@ajinzrathod
Copy link
Copy Markdown
Author

Thanks for the detailed review! Regarding the SIGILL crash. I investigated further and it's an Apple Silicon specific issue in pyiceberg's native code.

Here's what I did:

I reverted the two-transaction workaround back to a single transaction (mixing txn.overwrite() + txn.append() in one transaction) and ran the test that was crashing:

ACTIVE_DESTINATIONS='["filesystem"]' ALL_FILESYSTEM_DRIVERS='["memory", "file"]' \
uv run pytest "tests/load/pipeline/test_merge_disposition.py::test_merge_on_keys_in_schema_nested_hints[upsert-filesystem-parquet-iceberg-no-staging-data]" -v

On my Mac (Apple Silicon), it crashes with SIGILL (exit code 132) — on both pyiceberg 0.9.1 and 0.10.0.

See the screenshot below:

image

I then ran the exact same test inside a Docker container on Linux x86_64. Passes on Linux.

The crash is specific to Apple Silicon, likely in pyiceberg-core's native Rust code which is compiled differently for ARM64 vs x86_64. The test uses partitioned table, which seems to trigger a code path in the native writer that causes this.

Since CI runs on Linux, the single-transaction approach works fine there. But Apple Silicon users on pyiceberg 0.9.1 or 0.10.0 would hit this crash on partitioned upserts with complex schemas.

Agreed on bumping to >=0.10.0 and simplifying. I've been testing this approach locally with large datasets (5GB, 11GB, 55GB in a 4 GB container). Have some findings around memory, snapshot behavior, and a schema evolution edge case. Will share details shortly.

@ajinzrathod
Copy link
Copy Markdown
Author

1. Scanner OOMs on large datasets

source_ds.scanner(batch_readahead=0, fragment_readahead=0, use_threads=False).to_reader() causes OOM (Killed) on large data. Tested with 11 GB in a 4 GB container. The PyArrow dataset scanner buffers row groups internally in its C++ memory pool regardless of batch_readahead=0.

  • gc.collect() and malloc_trim(0) don't help, it's Arrow's internal C++ allocation, not Python memory.

The scanner works fine on small data, our earlier tests with LIMIT_FILES=3 (~1.8 GB) completed successfully. It only OOMs when data exceeds the container's memory limit. The scanner's internal buffering accumulates over time; small data finishes before it becomes a problem.

Fix: pq.ParquetFile.iter_batches(batch_size=10_000), reads one row group at a time with explicit memory control. Only Parquet files are used in the Iceberg path, so pq.ParquetFile is always valid.

def _iter_batches():
    for file_path in self.file_paths:
        pf = pq.ParquetFile(file_path)
        for batch in pf.iter_batches(batch_size=10_000):
            yield batch
        del pf
batches_gen = _iter_batches()
first_batch = next(batches_gen)
arrow_rbr = pa.RecordBatchReader.from_batches(
    first_batch.schema,
    itertools.chain([first_batch], batches_gen),
)
del first_batch

Streaming via pq.ParquetFile.iter_batches(batch_size=10_000) + txn.append()/txn.upsert() per batch solves the OOM problem. This produces multiple snapshots (not a single snapshot per operation), but the transaction commit is atomic. All snapshots apply together, or none do. No partial state is visible to readers.

@ajinzrathod
Copy link
Copy Markdown
Author

2. Schema evolution breaks txn.upsert() scan

After table.update_schema(), the snapshot still references the old schema_id. txn.upsert() internally scans using use_ref() which resolves to the snapshot's schema_id, so it returns fewer columns than expected → ValueError: Target schema's field names are not matching.

How it happens:

  1. table.update_schema() updates the table's metadata to point to a new schema (e.g., schema_id=2), but the latest snapshot still references the old schema (schema_id=1).
  2. When txn.upsert() scans, it resolves the schema from the snapshot (schema_id=1, say 10 columns), but the incoming data has 11 columns.
  3. The scan returns 10-column rows, upsert expects 11 → ValueError.

This affects any dataset size, it's a metadata issue, not a scale issue. txn.append() is unaffected because it only writes and never scans existing data.

Workaround: Append an empty table after schema evolution to force a new snapshot with the updated schema_id.

N.B: Using pq.ParquetFile.iter_batches approach, txn.upsert() also eliminates duplicate inserts across batches, unlike table.scan().to_arrow(), it sees uncommitted data from earlier batches within the same transaction.

@ajinzrathod
Copy link
Copy Markdown
Author

ajinzrathod commented Mar 25, 2026

3. Trade-off in replace operation: txn.append() files are deletable but O(N²), fast_append() is O(N) but files can't be deleted

For streaming replace, we need to delete existing data first, then append new batches. We tested txn.delete(AlwaysTrue()) + fast_append().append_data_file(). O(N) performance, constant memory.

Problem: Files written by fast_append().append_data_file() + _dataframe_to_data_files() are not deleted by table.delete(AlwaysTrue()) or txn.delete(AlwaysTrue()). On a clean table with 20 files, only 1–2 files get deleted instead of all 20.

What we verified:

  • table.delete(AlwaysTrue()) on files written by txn.append() deleted all entries (60 → 0) — the delete mechanism itself works.
  • _StrictMetricsEvaluator returns the correct value for AlwaysTrue() — verified.
  • Snapshot refs and snapshot_by_name("main") are correct — verified.
  • The difference is in how files are registered in the manifest. txn.append() uses pyiceberg's full internal pipeline (_append_snapshot_producer_FastAppendFiles → commit), while fast_append().append_data_file() manually registers DataFile objects from _dataframe_to_data_files(). The DataFile objects appear identical, but something in the manifest entry metadata (likely partition summaries, statistics format, or field IDs) causes _DeleteFiles._compute_deletes to classify them as "not relevant" and skip them.

Using txn.delete(AlwaysTrue()) + txn.append() per batch in a single transaction. Replace is correct and atomic, but subject to the O(N²) performance issue below.

@ajinzrathod
Copy link
Copy Markdown
Author

4. txn.append() and txn.upsert() have O(N²) performance on large datasets

Transaction.table_metadata is a property that replays all accumulated _updates with model_copy(deep=True) on every access (~15 times per txn.append() call). Each txn.append() adds 2 updates. After N appends: N × 15 × replay(2N) = O(N²).

For small datasets (<1 GB, ~20 batches) it's fine. For 55 GB (~5500 batches at batch_size=10_000), the append load step ran for 57 minutes and 34 seconds.

image

Upsert is even worse, txn.upsert() scans existing data on every call in addition to the O(N²) metadata replay. Upserting 10 files (~3 GB, 500,000 rows) against 55 GB of existing data has been running for over 2 hours and is still not complete.

@ajinzrathod
Copy link
Copy Markdown
Author

ajinzrathod commented Mar 25, 2026

Summary:

Approach Memory Write perf Replace (delete) Status
ParquetFile.iter_batches() + txn.append() Constant O(N²) Works Current code
ParquetFile.iter_batches() + fast_append() Constant O(N) Broken Abandoned

FWIW, it would be valuable to validate append, replace, and merge operations on datasets larger than 10 GB (with container RAM smaller than the data size) to confirm the streaming approach at scale. I tested the proposed approach on Apple Silicon using a 4 GB Docker container with datasets ranging from 2 GB to 55 GB.


The test test_merge_on_keys_in_schema_nested_hints[upsert-filesystem-parquet-iceberg-no-staging-data] crashes with SIGILL (exit code 132) on Apple Silicon (ARM64). This happens with txn.upsert() on partitioned tables. The crash is in pyiceberg-core's Rust extension. It reproduces regardless of the read approach (pq.ParquetFile.iter_batches() or scanner). This is a pyiceberg-core bug and does not reproduce on Linux x86_64 (CI should pass).

I'm not pushing code to this repo yet, will do after we have proper sign-off on the approach. My current working code is on my personal fork in the feat/3752-v2-iceberg-streaming-atomic-commit branch for reference against the findings above.

@ajinzrathod
Copy link
Copy Markdown
Author

I think it explains best what I think we should do here... pls let me know if you want to go with it, It is not a prob for us to take over

Sounds good, happy to look at the implementation plan. Given the findings above (especially the O(N²) txn.append(), performance, SIGILL, and the fast_append() delete issue), interested to hear your thoughts. Open to collaborating on this or letting you take it forward, whatever works best.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Stream Iceberg writes with constant memory and atomic commits

2 participants