Skip to content

feat: Add native sort merge join operator [experimental]#3871

Draft
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:comet-native-smj
Draft

feat: Add native sort merge join operator [experimental]#3871
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:comet-native-smj

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

N/A - New experimental feature

Rationale for this change

This PR introduces a Comet-owned sort merge join operator (CometSortMergeJoinExec) that replaces DataFusion's SortMergeJoinExec. The motivations are:

  1. Memory efficiency — Spills entire buffered batches (including join key arrays) to Arrow IPC, fixing DataFusion's gap where join key arrays stay in memory during spills
  2. Performance — Implements Spark's key-reuse optimization: when consecutive streamed rows share the same join key, the buffered match group is reused without re-scanning
  3. Decoupling — Owns the join operator so Comet is not coupled to DataFusion's evolving SMJ API and can evolve independently

A configuration toggle (spark.comet.exec.sortMergeJoin.useNative, default true) allows switching between the new and DataFusion implementations for A/B benchmarking.

This is experimental. The implementation passes all existing tests but has known limitations listed below.

What changes are included in this PR?

New files (native/core/src/execution/joins/)

  • sort_merge_join.rsCometSortMergeJoinExec implementing DataFusion's ExecutionPlan trait
  • sort_merge_join_stream.rs — Streaming state machine (Init, PollStreamed, PollBuffered, Comparing, CollectingBuffered, Joining, OutputReady, DrainUnmatched, DrainBuffered, Exhausted)
  • buffered_batch.rsBufferedMatchGroup with batch-level Arrow IPC spilling via SpillManager
  • output_builder.rs — Batch materialization using Arrow take/concat kernels
  • filter.rs — Join filter evaluation with corrected masks for outer/semi/anti joins
  • metrics.rs — Metrics matching CometMetricNode.scala (input/output rows/batches, join_time, peak_mem, spill counts)
  • tests.rs — 11 unit tests covering all join types and spilling

Modified files

  • CometConf.scala — New spark.comet.exec.sortMergeJoin.useNative config
  • planner.rs — Conditional operator construction based on config
  • jni_api.rs — Pass spark config through to planner
  • spark_config.rs — Config constant

Supported join types

All 6: Inner, LeftOuter, RightOuter, FullOuter, LeftSemi, LeftAnti

Key design decisions

  • Streamed/buffered assignment matches Spark (RightOuter swaps sides, all others: left=streamed, right=buffered)
  • Null handling matches Spark (NullEqualsNothing — null keys never match for inner/semi, emit with null counterpart for outer/anti)
  • Batch-level spilling via DataFusion's SpillManager and Arrow IPC when MemoryReservation::try_grow() fails
  • Key-reuse caches streamed key as OwnedRow via Arrow's RowConverter

Known limitations / future work

  • Full outer + filter cross-group tracking: Buffered rows from earlier key groups that were unmatched across all streamed rows will not be null-joined. Requires tracking unmatched buffered rows across key group boundaries.
  • No codegen: Spark has whole-stage codegen for SMJ; this implementation uses interpreted evaluation.
  • A/B comparison test: Automated Scala test comparing native vs DataFusion output not yet added.

How are these changes tested?

  • 11 Rust unit tests covering all 6 join types (inner, left/right/full outer, left semi, left anti), null key handling, duplicate keys (many-to-many), empty results, and forced spilling under 1KB memory limit
  • Existing CometJoinSuite (10 tests) passes with the new implementation as a drop-in replacement, including SortMergeJoin with and without join filters across all join types

andygrove added 10 commits April 1, 2026 06:46
Add OutputBuilder that accumulates matched/null-joined index pairs
during the sort merge join's Joining state and materializes them into
Arrow RecordBatches. Includes BufferedMatchGroup stub with spill
support and join filter evaluation module.
…anti

Handle JoinSide::None variant in build_filter_candidate_batch to fix
compilation with DataFusion 52.4.0 which includes a None variant in
the JoinSide enum.
Implement the streaming sort merge join state machine that drives two
sorted input streams, compares join keys, and produces joined output
batches. The state machine handles all join types (inner, left/right
outer, full outer, semi, anti) with key-reuse optimization via
RowConverter, multi-batch match group collection, null key handling,
optional join filter evaluation, and memory-aware spilling.
Simplify Init state logic to avoid identical if-branches and use
Arc::clone instead of .clone() on ref-counted pointers.
Adds spark.comet.exec.sortMergeJoin.useNative config (default true) to
switch between Comet's native sort merge join and DataFusion's
SortMergeJoinExec. Passes spark config through JNI to the native planner
for runtime selection.
Fix two bugs discovered by the tests:
- Output builder indices referenced stale streamed batch after it was
  cleared; defer batch clearing until after flush
- Advancing buffered side discarded entire batch instead of advancing
  past just the first row; use slice to preserve remaining rows
Add 7 new test cases covering left/right/full outer joins, left
semi/anti joins, null key handling in outer joins, and an inner join
under memory pressure that forces spilling.

Fix two bugs found by the new tests:
- Full/right outer join now correctly emits unmatched buffered rows
  that remain after the streamed side is exhausted (new DrainBuffered
  state in the join stream state machine).
- Spill read-back no longer panics inside an async runtime by using
  block_in_place to allow the nested block_on call.
@andygrove andygrove changed the title [EXPERIMENTAL] Add native sort merge join operator feat: Add native sort merge join operator [experimental] Apr 1, 2026
- Fix N×M spill read issue: cache loaded batches across all columns
  instead of reloading per column. Extracted group_by_batch helpers.
- Remove unused streamed_schema field from OutputBuilder
- Encapsulate BufferedBatch.matched behind mark_matched/unmatched_indices
- Remove duplicate take_buffered_matched/take_buffered_null_joins with
  shared take_from_groups helper
RowConverter.convert_columns() was called on the full streamed batch
for every row during key-reuse checks. Now converted once when a new
streamed batch arrives and cached as streamed_rows. Key-reuse and
cache_streamed_key index directly into the cached Rows.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant