From 947352d7ef21b9e52eba959bda21f85c41f1a05a Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 18 Mar 2026 19:32:23 +0800 Subject: [PATCH 1/3] Fix memory reservation starvation in sort-merge (#20642) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Closes #. This PR fixes memory reservation starvation in sort-merge when multiple sort partitions share a GreedyMemoryPool. When multiple `ExternalSorter` instances run concurrently and share a single memory pool, the merge phase starves: 1. Each partition pre-reserves sort_spill_reservation_bytes via merge_reservation 2. When entering the merge phase, new_empty() was used to create a new reservation starting at 0 bytes, while the pre-reserved bytes sat idle in ExternalSorter.merge_reservation 3. Those freed bytes were immediately consumed by other partitions racing for memory 4. The merge could no longer allocate memory from the pool → OOM / starvation ~~I can't find a deterministic way to reproduce the bug, but it occurs in our production.~~ Add an end-to-end test to verify the fix --- datafusion/physical-plan/src/sorts/builder.rs | 54 +++++- .../src/sorts/multi_level_merge.rs | 48 ++++-- datafusion/physical-plan/src/sorts/sort.rs | 159 +++++++++++++++++- 3 files changed, 238 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index 9b2fa968222c4..a462b832056bd 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -40,9 +40,24 @@ pub struct BatchBuilder { /// Maintain a list of [`RecordBatch`] and their corresponding stream batches: Vec<(usize, RecordBatch)>, - /// Accounts for memory used by buffered batches + /// Accounts for memory used by buffered batches. + /// + /// May include pre-reserved bytes (from `sort_spill_reservation_bytes`) + /// that were transferred via [`MemoryReservation::take()`] to prevent + /// starvation when concurrent sort partitions compete for pool memory. reservation: MemoryReservation, + /// Tracks the actual memory used by buffered batches (not including + /// pre-reserved bytes). This allows [`Self::push_batch`] to skip pool + /// allocation requests when the pre-reserved bytes cover the batch. + batches_mem_used: usize, + + /// The initial reservation size at construction time. When the reservation + /// is pre-loaded with `sort_spill_reservation_bytes` (via `take()`), this + /// records that amount so we never shrink below it, maintaining the + /// anti-starvation guarantee throughout the merge. + initial_reservation: usize, + /// The current [`BatchCursor`] for each stream cursors: Vec, @@ -59,19 +74,26 @@ impl BatchBuilder { batch_size: usize, reservation: MemoryReservation, ) -> Self { + let initial_reservation = reservation.size(); Self { schema, batches: Vec::with_capacity(stream_count * 2), cursors: vec![BatchCursor::default(); stream_count], indices: Vec::with_capacity(batch_size), reservation, + batches_mem_used: 0, + initial_reservation, } } /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation - .try_grow(get_record_batch_memory_size(&batch))?; + let size = get_record_batch_memory_size(&batch); + self.batches_mem_used += size; + // Only request additional memory from the pool when actual batch + // usage exceeds the current reservation (which may include + // pre-reserved bytes from sort_spill_reservation_bytes). + try_grow_reservation_to_at_least(&mut self.reservation, self.batches_mem_used)?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { @@ -143,14 +165,38 @@ impl BatchBuilder { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(get_record_batch_memory_size(batch)); + self.batches_mem_used -= get_record_batch_memory_size(batch); } retain }); + // Release excess memory back to the pool, but never shrink below + // initial_reservation to maintain the anti-starvation guarantee + // for the merge phase. + let target = self.batches_mem_used.max(self.initial_reservation); + if self.reservation.size() > target { + self.reservation.shrink(self.reservation.size() - target); + } + Ok(Some(RecordBatch::try_new( Arc::clone(&self.schema), columns, )?)) } } + +/// Try to grow `reservation` so it covers at least `needed` bytes. +/// +/// When a reservation has been pre-loaded with bytes (e.g. via +/// [`MemoryReservation::take()`]), this avoids redundant pool +/// allocations: if the reservation already covers `needed`, this is +/// a no-op; otherwise only the deficit is requested from the pool. +pub(crate) fn try_grow_reservation_to_at_least( + reservation: &mut MemoryReservation, + needed: usize, +) -> Result<()> { + if needed > reservation.size() { + reservation.try_grow(needed - reservation.size())?; + } + Ok(()) +} diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 2e0d668a29559..a276881fb48eb 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -30,7 +30,8 @@ use arrow::datatypes::SchemaRef; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; -use crate::sorts::sort::get_reserved_bytes_for_record_batch_size; +use crate::sorts::builder::try_grow_reservation_to_at_least; +use crate::sorts::sort::get_reserved_byte_for_record_batch_size; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::stream::RecordBatchStreamAdapter; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -253,7 +254,12 @@ impl MultiLevelMergeBuilder { // Need to merge multiple streams (_, _) => { - let mut memory_reservation = self.reservation.new_empty(); + // Transfer any pre-reserved bytes (from sort_spill_reservation_bytes) + // to the merge memory reservation. This prevents starvation when + // concurrent sort partitions compete for pool memory: the pre-reserved + // bytes cover spill file buffer reservations without additional pool + // allocation. + let mut memory_reservation = self.reservation.take(); // Don't account for existing streams memory // as we are not holding the memory for them @@ -269,6 +275,15 @@ impl MultiLevelMergeBuilder { let is_only_merging_memory_streams = sorted_spill_files.is_empty(); + // If no spill files were selected (e.g. all too large for + // available memory but enough in-memory streams exist), + // return the pre-reserved bytes to self.reservation so + // create_new_merge_sort can transfer them to the merge + // stream's BatchBuilder. + if is_only_merging_memory_streams { + mem::swap(&mut self.reservation, &mut memory_reservation); + } + for spill in sorted_spill_files { let stream = self .spill_manager @@ -337,8 +352,10 @@ impl MultiLevelMergeBuilder { builder = builder.with_bypass_mempool(); } else { // If we are only merging in-memory streams, we need to use the memory reservation - // because we don't know the maximum size of the batches in the streams - builder = builder.with_reservation(self.reservation.new_empty()); + // because we don't know the maximum size of the batches in the streams. + // Use take() to transfer any pre-reserved bytes so the merge can use them + // as its initial budget without additional pool allocation. + builder = builder.with_reservation(self.reservation.take()); } builder.build() @@ -356,17 +373,22 @@ impl MultiLevelMergeBuilder { ) -> Result<(Vec, usize)> { assert_ne!(buffer_len, 0, "Buffer length must be greater than 0"); let mut number_of_spills_to_read_for_current_phase = 0; + // Track total memory needed for spill file buffers. When the + // reservation has pre-reserved bytes (from sort_spill_reservation_bytes), + // those bytes cover the first N spill files without additional pool + // allocation, preventing starvation under memory pressure. + let mut total_needed: usize = 0; for spill in &self.sorted_spill_files { - // For memory pools that are not shared this is good, for other this is not - // and there should be some upper limit to memory reservation so we won't starve the system - match reservation.try_grow( - get_reserved_bytes_for_record_batch_size( - spill.max_record_batch_memory, - // Size will be the same as the sliced size, bc it is a spilled batch. - spill.max_record_batch_memory, - ) * buffer_len, - ) { + let per_spill = get_reserved_byte_for_record_batch_size( + spill.max_record_batch_memory, + ) * buffer_len; + total_needed += per_spill; + + // For memory pools that are not shared this is good, for other + // this is not and there should be some upper limit to memory + // reservation so we won't starve the system. + match try_grow_reservation_to_at_least(reservation, total_needed) { Ok(_) => { number_of_spills_to_read_for_current_phase += 1; } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index d8828b20b91bb..5413bad0bb1d0 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -338,11 +338,6 @@ impl ExternalSorter { /// 2. A combined streaming merge incorporating both in-memory /// batches and data from spill files on disk. async fn sort(&mut self) -> Result { - // Release the memory reserved for merge back to the pool so - // there is some left when `in_mem_sort_stream` requests an - // allocation. - self.merge_reservation.free(); - if self.spilled_before() { // Sort `in_mem_batches` and spill it first. If there are many // `in_mem_batches` and the memory limit is almost reached, merging @@ -351,6 +346,13 @@ impl ExternalSorter { self.sort_and_spill_in_mem_batches().await?; } + // Transfer the pre-reserved merge memory to the streaming merge + // using `take()` instead of `new_empty()`. This ensures the merge + // stream starts with `sort_spill_reservation_bytes` already + // allocated, preventing starvation when concurrent sort partitions + // compete for pool memory. `take()` moves the bytes atomically + // without releasing them back to the pool, so other partitions + // cannot race to consume the freed memory. StreamingMergeBuilder::new() .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files)) .with_spill_manager(self.spill_manager.clone()) @@ -359,9 +361,14 @@ impl ExternalSorter { .with_metrics(self.metrics.baseline.clone()) .with_batch_size(self.batch_size) .with_fetch(None) - .with_reservation(self.merge_reservation.new_empty()) + .with_reservation(self.merge_reservation.take()) .build() } else { + // Release the memory reserved for merge back to the pool so + // there is some left when `in_mem_sort_stream` requests an + // allocation. Only needed for the non-spill path; the spill + // path transfers the reservation to the merge stream instead. + self.merge_reservation.free(); self.in_mem_sort_stream(self.metrics.baseline.clone()) } } @@ -371,6 +378,12 @@ impl ExternalSorter { self.reservation.size() } + /// How much memory is reserved for the merge phase? + #[cfg(test)] + fn merge_reservation_size(&self) -> usize { + self.merge_reservation.size() + } + /// How many bytes have been spilled to disk? fn spilled_bytes(&self) -> usize { self.metrics.spill_metrics.spilled_bytes.value() @@ -2721,4 +2734,138 @@ mod tests { Ok(()) } + + /// Verifies that `ExternalSorter::sort()` transfers the pre-reserved + /// merge bytes to the merge stream via `take()`, rather than leaving + /// them in the sorter (via `new_empty()`). + /// + /// 1. Create a sorter with a tight memory pool and insert enough data + /// to force spilling + /// 2. Verify `merge_reservation` holds the pre-reserved bytes before sort + /// 3. Call `sort()` to get the merge stream + /// 4. Verify `merge_reservation` is now 0 (bytes transferred to merge stream) + /// 5. Simulate contention: a competing consumer grabs all available pool memory + /// 6. Verify the merge stream still works (it uses its pre-reserved bytes + /// as initial budget, not requesting from pool starting at 0) + /// + /// With `new_empty()` (before fix), step 4 fails: `merge_reservation` + /// still holds the bytes, the merge stream starts with 0 budget, and + /// those bytes become unaccounted-for reserved memory that nobody uses. + #[tokio::test] + async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> { + use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryConsumer, MemoryPool, + }; + use futures::TryStreamExt; + + let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB + + // Pool: merge reservation (10KB) + enough room for sort to work. + // The room must accommodate batch data accumulation before spilling. + let sort_working_memory: usize = 40 * 1024; // 40 KB for sort operations + let pool_size = sort_spill_reservation_bytes + sort_working_memory; + let pool: Arc = Arc::new(GreedyMemoryPool::new(pool_size)); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::clone(&pool)) + .build_arc()?; + + let metrics_set = ExecutionPlanMetricsSet::new(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + let mut sorter = ExternalSorter::new( + 0, + Arc::clone(&schema), + [PhysicalSortExpr::new_default(Arc::new(Column::new("x", 0)))].into(), + 128, // batch_size + sort_spill_reservation_bytes, + usize::MAX, // sort_in_place_threshold_bytes (high to avoid concat path) + SpillCompression::Uncompressed, + &metrics_set, + Arc::clone(&runtime), + )?; + + // Insert enough data to force spilling. + let num_batches = 200; + for i in 0..num_batches { + let values: Vec = ((i * 100)..((i + 1) * 100)).rev().collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(values))], + )?; + sorter.insert_batch(batch).await?; + } + + assert!( + sorter.spilled_before(), + "Test requires spilling to exercise the merge path" + ); + + // Before sort(), merge_reservation holds sort_spill_reservation_bytes. + assert!( + sorter.merge_reservation_size() >= sort_spill_reservation_bytes, + "merge_reservation should hold the pre-reserved bytes before sort()" + ); + + // Call sort() to get the merge stream. With the fix (take()), + // the pre-reserved merge bytes are transferred to the merge + // stream. Without the fix (free() + new_empty()), the bytes + // are released back to the pool and the merge stream starts + // with 0 bytes. + let merge_stream = sorter.sort().await?; + + // THE KEY ASSERTION: after sort(), merge_reservation must be 0. + // This proves take() transferred the bytes to the merge stream, + // rather than them being freed back to the pool where other + // partitions could steal them. + assert_eq!( + sorter.merge_reservation_size(), + 0, + "After sort(), merge_reservation should be 0 (bytes transferred \ + to merge stream via take()). If non-zero, the bytes are still \ + held by the sorter and will be freed on drop, allowing other \ + partitions to steal them." + ); + + // Drop the sorter to free its reservations back to the pool. + drop(sorter); + + // Simulate contention: another partition grabs ALL available + // pool memory. If the merge stream didn't receive the + // pre-reserved bytes via take(), it will fail when it tries + // to allocate memory for reading spill files. + let mut contender = MemoryConsumer::new("CompetingPartition").register(&pool); + let available = pool_size.saturating_sub(pool.reserved()); + if available > 0 { + contender.try_grow(available).unwrap(); + } + + // The merge stream must still produce correct results despite + // the pool being fully consumed by the contender. This only + // works if sort() transferred the pre-reserved bytes to the + // merge stream (via take()) rather than freeing them. + let batches: Vec = merge_stream.try_collect().await?; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, + (num_batches * 100) as usize, + "Merge stream should produce all rows even under memory contention" + ); + + // Verify data is sorted + let merged = concat_batches(&schema, &batches)?; + let col = merged.column(0).as_primitive::(); + for i in 1..col.len() { + assert!( + col.value(i - 1) <= col.value(i), + "Output should be sorted, but found {} > {} at index {}", + col.value(i - 1), + col.value(i), + i + ); + } + + drop(contender); + Ok(()) + } } From 3b08f753ebbe1107d1f4fc903c99069e9d27fc37 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 20 Mar 2026 15:12:34 +0800 Subject: [PATCH 2/3] Cherry-pick: Fix sort merge interleave overflow (#20922) Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/sorts/builder.rs | 251 +++++++++++++++--- datafusion/physical-plan/src/sorts/merge.rs | 123 ++++++++- .../src/sorts/multi_level_merge.rs | 6 +- .../src/sorts/sort_preserving_merge.rs | 56 ++++ 4 files changed, 389 insertions(+), 47 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index a462b832056bd..ddd2b1d05331e 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -16,11 +16,16 @@ // under the License. use crate::spill::get_record_batch_memory_size; +use arrow::array::ArrayRef; use arrow::compute::interleave; use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; +use log::warn; +use std::any::Any; +use std::panic::{AssertUnwindSafe, catch_unwind}; use std::sync::Arc; #[derive(Debug, Copy, Clone, Default)] @@ -126,49 +131,56 @@ impl BatchBuilder { &self.schema } - /// Drains the in_progress row indexes, and builds a new RecordBatch from them - /// - /// Will then drop any batches for which all rows have been yielded to the output - /// - /// Returns `None` if no pending rows - pub fn build_record_batch(&mut self) -> Result> { - if self.is_empty() { - return Ok(None); - } - - let columns = (0..self.schema.fields.len()) + /// Try to interleave all columns using the given index slice. + fn try_interleave_columns( + &self, + indices: &[(usize, usize)], + ) -> Result> { + (0..self.schema.fields.len()) .map(|column_idx| { let arrays: Vec<_> = self .batches .iter() .map(|(_, batch)| batch.column(column_idx).as_ref()) .collect(); - Ok(interleave(&arrays, &self.indices)?) + recover_offset_overflow_from_panic(|| interleave(&arrays, indices)) }) - .collect::>>()?; - - self.indices.clear(); - - // New cursors are only created once the previous cursor for the stream - // is finished. This means all remaining rows from all but the last batch - // for each stream have been yielded to the newly created record batch - // - // We can therefore drop all but the last batch for each stream - let mut batch_idx = 0; - let mut retained = 0; - self.batches.retain(|(stream_idx, batch)| { - let stream_cursor = &mut self.cursors[*stream_idx]; - let retain = stream_cursor.batch_idx == batch_idx; - batch_idx += 1; - - if retain { - stream_cursor.batch_idx = retained; - retained += 1; - } else { - self.batches_mem_used -= get_record_batch_memory_size(batch); - } - retain - }); + .collect::>>() + } + + /// Builds a record batch from the first `rows_to_emit` buffered rows. + fn finish_record_batch( + &mut self, + rows_to_emit: usize, + columns: Vec, + ) -> Result { + // Remove consumed indices, keeping any remaining for the next call. + self.indices.drain(..rows_to_emit); + + // Only clean up fully-consumed batches when all indices are drained, + // because remaining indices may still reference earlier batches. + if self.indices.is_empty() { + // New cursors are only created once the previous cursor for the stream + // is finished. This means all remaining rows from all but the last batch + // for each stream have been yielded to the newly created record batch + // + // We can therefore drop all but the last batch for each stream + let mut batch_idx = 0; + let mut retained = 0; + self.batches.retain(|(stream_idx, batch)| { + let stream_cursor = &mut self.cursors[*stream_idx]; + let retain = stream_cursor.batch_idx == batch_idx; + batch_idx += 1; + + if retain { + stream_cursor.batch_idx = retained; + retained += 1; + } else { + self.batches_mem_used -= get_record_batch_memory_size(batch); + } + retain + }); + } // Release excess memory back to the pool, but never shrink below // initial_reservation to maintain the anti-starvation guarantee @@ -178,10 +190,27 @@ impl BatchBuilder { self.reservation.shrink(self.reservation.size() - target); } - Ok(Some(RecordBatch::try_new( - Arc::clone(&self.schema), - columns, - )?)) + RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(Into::into) + } + + /// Drains the in_progress row indexes, and builds a new RecordBatch from them + /// + /// Will then drop any batches for which all rows have been yielded to the output. + /// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX), + /// retries with progressively fewer rows until it succeeds. + /// + /// Returns `None` if no pending rows + pub fn build_record_batch(&mut self) -> Result> { + if self.is_empty() { + return Ok(None); + } + + let (rows_to_emit, columns) = + retry_interleave(self.indices.len(), self.indices.len(), |rows_to_emit| { + self.try_interleave_columns(&self.indices[..rows_to_emit]) + })?; + + Ok(Some(self.finish_record_batch(rows_to_emit, columns)?)) } } @@ -200,3 +229,143 @@ pub(crate) fn try_grow_reservation_to_at_least( } Ok(()) } + +/// Returns true if the error is an Arrow offset overflow. +fn is_offset_overflow(e: &DataFusionError) -> bool { + matches!( + e, + DataFusionError::ArrowError(boxed, _) + if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_)) + ) +} + +fn offset_overflow_error() -> DataFusionError { + DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None) +} + +fn recover_offset_overflow_from_panic(f: F) -> Result +where + F: FnOnce() -> std::result::Result, +{ + // Arrow's interleave can panic on i32 offset overflow with + // `.expect("overflow")` / `.expect("offset overflow")`. + // Catch only those specific panics so the caller can retry + // with fewer rows while unrelated defects still unwind. + match catch_unwind(AssertUnwindSafe(f)) { + Ok(result) => Ok(result?), + Err(panic_payload) => { + if is_arrow_offset_overflow_panic(panic_payload.as_ref()) { + Err(offset_overflow_error()) + } else { + std::panic::resume_unwind(panic_payload); + } + } + } +} + +fn retry_interleave( + mut rows_to_emit: usize, + total_rows: usize, + mut interleave: F, +) -> Result<(usize, T)> +where + F: FnMut(usize) -> Result, +{ + loop { + match interleave(rows_to_emit) { + Ok(value) => return Ok((rows_to_emit, value)), + Err(e) if is_offset_overflow(&e) => { + rows_to_emit /= 2; + if rows_to_emit == 0 { + return Err(e); + } + warn!( + "Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}" + ); + } + Err(e) => return Err(e), + } + } +} + +fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> { + if let Some(msg) = payload.downcast_ref::<&str>() { + return Some(msg); + } + if let Some(msg) = payload.downcast_ref::() { + return Some(msg.as_str()); + } + None +} + +/// Returns true if a caught panic payload matches the Arrow offset overflows +/// raised by interleave's offset builders. +fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool { + matches!(panic_message(payload), Some("overflow" | "offset overflow")) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::error::ArrowError; + + #[test] + fn test_retry_interleave_halves_rows_until_success() { + let mut attempts = Vec::new(); + + let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| { + attempts.push(rows_to_emit); + if rows_to_emit > 1 { + Err(offset_overflow_error()) + } else { + Ok("ok") + } + }) + .unwrap(); + + assert_eq!(rows_to_emit, 1); + assert_eq!(result, "ok"); + assert_eq!(attempts, vec![4, 2, 1]); + } + + #[test] + fn test_recover_offset_overflow_from_panic() { + let error = recover_offset_overflow_from_panic( + || -> std::result::Result<(), ArrowError> { panic!("offset overflow") }, + ) + .unwrap_err(); + + assert!(is_offset_overflow(&error)); + } + + #[test] + fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() { + let panic_payload = catch_unwind(AssertUnwindSafe(|| { + let _ = recover_offset_overflow_from_panic( + || -> std::result::Result<(), ArrowError> { panic!("capacity overflow") }, + ); + })); + + assert!(panic_payload.is_err()); + } + + #[test] + fn test_is_arrow_offset_overflow_panic() { + let overflow = Box::new("overflow") as Box; + assert!(is_arrow_offset_overflow_panic(overflow.as_ref())); + + let offset_overflow = + Box::new(String::from("offset overflow")) as Box; + assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref())); + + let capacity_overflow = Box::new("capacity overflow") as Box; + assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref())); + + let arithmetic_overflow = + Box::new(String::from("attempt to multiply with overflow")) + as Box; + assert!(!is_arrow_offset_overflow_panic( + arithmetic_overflow.as_ref() + )); + } +} diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 272816251daf9..c29933535adc5 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -53,6 +53,14 @@ pub(crate) struct SortPreservingMergeStream { /// `fetch` limit. done: bool, + /// Whether buffered rows should be drained after `done` is set. + /// + /// This is enabled when we stop because the `fetch` limit has been + /// reached, allowing partial batches left over after overflow handling to + /// be emitted on subsequent polls. It remains disabled for terminal + /// errors so the stream does not yield data after returning `Err`. + drain_in_progress_on_done: bool, + /// A loser tree that always produces the minimum cursor /// /// Node 0 stores the top winner, Nodes 1..num_streams store @@ -164,6 +172,7 @@ impl SortPreservingMergeStream { streams, metrics, done: false, + drain_in_progress_on_done: false, cursors: (0..stream_count).map(|_| None).collect(), prev_cursors: (0..stream_count).map(|_| None).collect(), round_robin_tie_breaker_mode: false, @@ -203,11 +212,28 @@ impl SortPreservingMergeStream { } } + fn emit_in_progress_batch(&mut self) -> Result> { + let rows_before = self.in_progress.len(); + let result = self.in_progress.build_record_batch(); + self.produced += rows_before - self.in_progress.len(); + result + } + fn poll_next_inner( &mut self, cx: &mut Context<'_>, ) -> Poll>> { if self.done { + // When `build_record_batch()` hits an i32 offset overflow (e.g. + // combined string offsets exceed 2 GB), it emits a partial batch + // and keeps the remaining rows in `self.in_progress.indices`. + // Drain those leftover rows before terminating the stream, + // otherwise they would be silently dropped. + // Repeated overflows are fine — each poll emits another partial + // batch until `in_progress` is fully drained. + if self.drain_in_progress_on_done && !self.in_progress.is_empty() { + return Poll::Ready(self.emit_in_progress_batch().transpose()); + } return Poll::Ready(None); } // Once all partitions have set their corresponding cursors for the loser tree, @@ -283,14 +309,13 @@ impl SortPreservingMergeStream { // stop sorting if fetch has been reached if self.fetch_reached() { self.done = true; + self.drain_in_progress_on_done = true; } else if self.in_progress.len() < self.batch_size { continue; } } - self.produced += self.in_progress.len(); - - return Poll::Ready(self.in_progress.build_record_batch().transpose()); + return Poll::Ready(self.emit_in_progress_batch().transpose()); } } @@ -542,3 +567,95 @@ impl RecordBatchStream for SortPreservingMergeStream Arc::clone(self.in_progress.schema()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::ExecutionPlanMetricsSet; + use crate::sorts::stream::PartitionedStream; + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, UnboundedMemoryPool, + }; + use futures::task::noop_waker_ref; + use std::cmp::Ordering; + + #[derive(Debug)] + struct EmptyPartitionedStream; + + impl PartitionedStream for EmptyPartitionedStream { + type Output = Result<(DummyValues, RecordBatch)>; + + fn partitions(&self) -> usize { + 1 + } + + fn poll_next( + &mut self, + _cx: &mut Context<'_>, + _stream_idx: usize, + ) -> Poll> { + Poll::Ready(None) + } + } + + #[derive(Debug)] + struct DummyValues; + + impl CursorValues for DummyValues { + fn len(&self) -> usize { + 0 + } + + fn eq(_l: &Self, _l_idx: usize, _r: &Self, _r_idx: usize) -> bool { + unreachable!("done-path test should not compare cursors") + } + + fn eq_to_previous(_cursor: &Self, _idx: usize) -> bool { + unreachable!("done-path test should not compare cursors") + } + + fn compare(_l: &Self, _l_idx: usize, _r: &Self, _r_idx: usize) -> Ordering { + unreachable!("done-path test should not compare cursors") + } + } + + #[test] + fn test_done_drains_buffered_rows() { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let pool: Arc = Arc::new(UnboundedMemoryPool::default()); + let reservation = MemoryConsumer::new("test").register(&pool); + let metrics = ExecutionPlanMetricsSet::new(); + + let mut stream = SortPreservingMergeStream::::new( + Box::new(EmptyPartitionedStream), + Arc::clone(&schema), + BaselineMetrics::new(&metrics, 0), + 16, + Some(1), + reservation, + true, + ); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + stream.in_progress.push_batch(0, batch).unwrap(); + stream.in_progress.push_row(0); + stream.done = true; + stream.drain_in_progress_on_done = true; + + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + match stream.poll_next_inner(&mut cx) { + Poll::Ready(Some(Ok(batch))) => assert_eq!(batch.num_rows(), 1), + other => { + panic!("expected buffered rows to be drained after done, got {other:?}") + } + } + assert!(stream.in_progress.is_empty()); + assert!(matches!(stream.poll_next_inner(&mut cx), Poll::Ready(None))); + } +} diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index a276881fb48eb..9a30f209be643 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -380,9 +380,9 @@ impl MultiLevelMergeBuilder { let mut total_needed: usize = 0; for spill in &self.sorted_spill_files { - let per_spill = get_reserved_byte_for_record_batch_size( - spill.max_record_batch_memory, - ) * buffer_len; + let per_spill = + get_reserved_byte_for_record_batch_size(spill.max_record_batch_memory) + * buffer_len; total_needed += per_spill; // For memory pools that are not shared this is good, for other diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 32e0825085ae2..0c926256c8139 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -478,6 +478,7 @@ mod tests { .with_session_config(config); Ok(Arc::new(task_ctx)) } + // The number in the function is highly related to the memory limit we are testing, // any change of the constant should be aware of fn generate_spm_for_round_robin_tie_breaker( @@ -1523,4 +1524,59 @@ mod tests { Err(_) => exec_err!("SortPreservingMerge caused a deadlock"), } } + + #[tokio::test] + async fn test_sort_merge_stops_after_error_with_buffered_rows() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let sort: LexOrdering = [PhysicalSortExpr::new_default(Arc::new(Column::new( + "i", 0, + )) + as Arc)] + .into(); + + let mut stream0 = RecordBatchReceiverStream::builder(Arc::clone(&schema), 2); + let tx0 = stream0.tx(); + let schema0 = Arc::clone(&schema); + stream0.spawn(async move { + let batch = + RecordBatch::try_new(schema0, vec![Arc::new(Int32Array::from(vec![1]))])?; + tx0.send(Ok(batch)).await.unwrap(); + tx0.send(exec_err!("stream failure")).await.unwrap(); + Ok(()) + }); + + let mut stream1 = RecordBatchReceiverStream::builder(Arc::clone(&schema), 1); + let tx1 = stream1.tx(); + let schema1 = Arc::clone(&schema); + stream1.spawn(async move { + let batch = + RecordBatch::try_new(schema1, vec![Arc::new(Int32Array::from(vec![2]))])?; + tx1.send(Ok(batch)).await.unwrap(); + Ok(()) + }); + + let metrics = ExecutionPlanMetricsSet::new(); + let reservation = + MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); + + let mut merge_stream = StreamingMergeBuilder::new() + .with_streams(vec![stream0.build(), stream1.build()]) + .with_schema(Arc::clone(&schema)) + .with_expressions(&sort) + .with_metrics(BaselineMetrics::new(&metrics, 0)) + .with_batch_size(task_ctx.session_config().batch_size()) + .with_fetch(None) + .with_reservation(reservation) + .build()?; + + let first = merge_stream.next().await.unwrap(); + assert!(first.is_err(), "expected merge stream to surface the error"); + assert!( + merge_stream.next().await.is_none(), + "merge stream yielded data after returning an error" + ); + + Ok(()) + } } From b0e4e6a4abdfb66f07174c08c41517fb806e7ae4 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 23 Mar 2026 10:31:53 +0800 Subject: [PATCH 3/3] Fix function name mismatch after cherry-pick to branch-52 The cherry-picked commit from branch-51 used `get_reserved_byte_for_record_batch_size` (1 param), but branch-52 has `get_reserved_bytes_for_record_batch_size` (2 params). Update the call site to use the branch-52 function signature. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/sorts/multi_level_merge.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 9a30f209be643..8985e1d8c70ee 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -31,7 +31,7 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use crate::sorts::builder::try_grow_reservation_to_at_least; -use crate::sorts::sort::get_reserved_byte_for_record_batch_size; +use crate::sorts::sort::get_reserved_bytes_for_record_batch_size; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::stream::RecordBatchStreamAdapter; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -380,9 +380,11 @@ impl MultiLevelMergeBuilder { let mut total_needed: usize = 0; for spill in &self.sorted_spill_files { - let per_spill = - get_reserved_byte_for_record_batch_size(spill.max_record_batch_memory) - * buffer_len; + let per_spill = get_reserved_bytes_for_record_batch_size( + spill.max_record_batch_memory, + // Size will be the same as the sliced size, bc it is a spilled batch. + spill.max_record_batch_memory, + ) * buffer_len; total_needed += per_spill; // For memory pools that are not shared this is good, for other