From 0362373f4669037c4c518d60f38f5992aa8bb269 Mon Sep 17 00:00:00 2001 From: Vignesh <149236000+vigneshsiva11@users.noreply.github.com> Date: Wed, 4 Mar 2026 15:18:14 +0000 Subject: [PATCH 1/2] Parquet: split batches transparently when i32 offset would overflow (#7973) When reading Parquet byte-array columns (Utf8 / Binary) into Arrow arrays with 32-bit offsets, the reader previously returned an error the moment accumulated data in a batch exceeded 2 GiB. This made large Parquet row groups (e.g. NLP datasets with long text columns) completely unreadable with default settings. This commit makes the Parquet reader treat batch_size as a *target* rather than a hard limit: when the next value would overflow the i32 offset type, the decoder stops early and returns the partial batch. The decoder's internal position is left at the unread value, so the following read_records() call resumes seamlessly - no rows are lost, duplicated, or reordered. Key changes ----------- * OffsetBuffer::would_overflow(data_len) - new inline helper that uses checked_add to safely detect whether appending data_len bytes would exceed the capacity of offset type I, without any mutation. * ByteArrayDecoderPlain::read - checks would_overflow before each try_push and breaks out of the loop when true; fixes max_remaining_values accounting to subtract actual reads rather than requested reads. * ByteArrayDecoderDeltaLength::read - same pattern; advances length_offset and data_offset only by what was actually consumed. * ByteArrayDecoderDelta::read - checks would_overflow inside the callback closure and uses an overflow flag to distinguish a clean stop from a genuine error. * ByteArrayDecoderDictionary::read - processes dictionary keys one at a time via decoder.read(1, ...) so that the DictIndexDecoder never advances past an unconsumed key on overflow. Tests ----- * test_would_overflow - unit test for the new helper covering both i32 and i64 offset types, including the usize overflow edge case. * test_plain_decoder_partial_read - confirms a 3-value PLAIN page is correctly split across two read() calls with no data lost. Fixes #7973 --- parquet/src/arrow/array_reader/byte_array.rs | 147 +++++++++++++++++-- parquet/src/arrow/buffer/offset_buffer.rs | 37 +++++ 2 files changed, 168 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 0acbe6501924..2e082a0e345b 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -397,17 +397,26 @@ impl ByteArrayDecoderPlain { return Err(ParquetError::EOF("eof decoding byte array".into())); } + // Stop early rather than overflow the 32-bit offset buffer. + // self.offset is NOT advanced, so the next read() call resumes + // from this value in the following batch. + if output.would_overflow(len as usize) { + break; + } + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; self.offset = end_offset; read += 1; } - self.max_remaining_values -= to_read; + // Use actual reads, not the requested amount, so max_remaining_values + // stays correct when we stop early. + self.max_remaining_values -= read; if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; + output.check_valid_utf8(initial_values_length)? } - Ok(to_read) + Ok(read) } pub fn skip(&mut self, to_skip: usize) -> Result { @@ -489,22 +498,33 @@ impl ByteArrayDecoderDeltaLength { output.values.reserve(total_bytes); let mut current_offset = self.data_offset; + let mut read = 0; for length in src_lengths { - let end_offset = current_offset + *length as usize; + let value_len = *length as usize; + let end_offset = current_offset + value_len; + + // Stop early rather than overflow the 32-bit offset buffer. + // length_offset / data_offset are only advanced by what was + // actually consumed, so the next read() resumes correctly. + if output.would_overflow(value_len) { + break; + } + output.try_push( &self.data.as_ref()[current_offset..end_offset], self.validate_utf8, )?; current_offset = end_offset; + read += 1; } self.data_offset = current_offset; - self.length_offset += to_read; + self.length_offset += read; if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; + output.check_valid_utf8(initial_values_length)? } - Ok(to_read) + Ok(read) } fn skip(&mut self, to_skip: usize) -> Result { @@ -542,13 +562,35 @@ impl ByteArrayDecoderDelta { let initial_values_length = output.values.len(); output.offsets.reserve(len.min(self.decoder.remaining())); - let read = self - .decoder - .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; + let mut read = 0; + let mut overflow = false; + let validate_utf8 = self.validate_utf8; + + let result = self.decoder.read(len, |bytes| { + // Stop early rather than overflow the 32-bit offset buffer. + // Returning Err leaves the DeltaByteArrayDecoder positioned at + // this value, so the next read() resumes correctly. + if output.would_overflow(bytes.len()) { + overflow = true; + return Err(general_err!("index overflow decoding byte array")); + } + output.try_push(bytes, validate_utf8)?; + read += 1; + Ok(()) + }); + + match result { + Ok(_) => { + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + } + // Overflow is expected – decoder is correctly positioned at the + // value that didn't fit, ready for the next batch. + Err(_) if overflow => {} + Err(e) => return Err(e), } + Ok(read) } @@ -580,9 +622,45 @@ impl ByteArrayDecoderDictionary { return Ok(0); } - self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice()) - }) + let dict_offsets = dict.offsets.as_slice(); + let dict_values = dict.values.as_slice(); + let mut total_read = 0; + + // Process one key at a time so we can stop cleanly when the output + // buffer would overflow. When the closure returns Err the + // DictIndexDecoder does NOT advance its position, so the same key will + // be retried in the next batch – no values are lost or skipped. + while total_read < len { + let mut overflow = false; + let n = self.decoder.read(1, |keys| { + let key = keys[0] as usize; + if key + 1 >= dict_offsets.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_offsets.len().saturating_sub(1) + )); + } + let start = dict_offsets[key].as_usize(); + let end = dict_offsets[key + 1].as_usize(); + let value = &dict_values[start..end]; + + if output.would_overflow(value.len()) { + overflow = true; + return Err(general_err!("index overflow decoding byte array")); + } + // Dictionary values were validated at dictionary-page decode time. + output.try_push(value, false) + }); + + match n { + Ok(0) => break, // no more values in this page + Ok(_) => total_read += 1, // successfully pushed one key + Err(_) if overflow => break, // would overflow – stop for this batch + Err(e) => return Err(e), // real error + } + } + + Ok(total_read) } fn skip( @@ -607,6 +685,43 @@ mod tests { use arrow_array::{Array, StringArray}; use arrow_buffer::Buffer; + /// Verify that `ByteArrayDecoderPlain` transparently splits a page across + /// multiple `read()` calls when asked for fewer values than the page holds, + /// and that no values are lost between calls. + #[test] + fn test_plain_decoder_partial_read() { + let mut page: Vec = Vec::new(); + for s in [b"foo" as &[u8], b"bar", b"baz"] { + page.extend_from_slice(&(s.len() as u32).to_le_bytes()); + page.extend_from_slice(s); + } + + let mut decoder = ByteArrayDecoderPlain::new( + bytes::Bytes::from(page), + 3, + Some(3), + false, + ); + + let mut output = OffsetBuffer::::default(); + + // First read: ask for 2 out of 3 values. + let n = decoder.read(&mut output, 2).unwrap(); + assert_eq!(n, 2); + assert_eq!(&output.values, b"foobar"); + assert_eq!(output.offsets.as_slice(), &[0, 3, 6]); + + // Second read: gets the remaining value. + let n2 = decoder.read(&mut output, 2).unwrap(); + assert_eq!(n2, 1); + assert_eq!(&output.values, b"foobarbaz"); + assert_eq!(output.offsets.as_slice(), &[0, 3, 6, 9]); + + // No more values. + let n3 = decoder.read(&mut output, 1).unwrap(); + assert_eq!(n3, 0); + } + #[test] fn test_byte_array_decoder() { let (pages, encoded_dictionary) = diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 209ed4e5c15f..f648ddb5d6aa 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -53,6 +53,19 @@ impl OffsetBuffer { self.len() == 0 } + /// Returns `true` if appending `data_len` bytes would overflow the offset type `I`. + /// + /// Used by decoders to stop filling a batch early rather than returning an error, + /// allowing the remaining values to be emitted in a subsequent batch. + #[inline] + pub fn would_overflow(&self, data_len: usize) -> bool { + // Use checked_add to handle the case where the sum itself overflows usize. + match self.values.len().checked_add(data_len) { + Some(total) => I::from_usize(total).is_none(), + None => true, // usize addition overflowed → definitely can't fit + } + } + /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -318,6 +331,30 @@ mod tests { buffer.check_valid_utf8(12).unwrap_err(); } + #[test] + fn test_would_overflow() { + // Buffer with 5 bytes already written. + let mut buf = OffsetBuffer::::default(); + buf.try_push(b"hello", false).unwrap(); // values.len() == 5 + + // Within i32::MAX – should not report overflow. + assert!(!buf.would_overflow(0)); + assert!(!buf.would_overflow(1)); + // 5 + (i32::MAX - 5) == i32::MAX, still representable. + assert!(!buf.would_overflow(i32::MAX as usize - 5)); + // 5 + (i32::MAX - 4) == i32::MAX + 1, overflows i32. + assert!(buf.would_overflow(i32::MAX as usize - 4)); + assert!(buf.would_overflow(i32::MAX as usize)); + // usize::MAX must be caught without panicking. + assert!(buf.would_overflow(usize::MAX)); + + // i64 offset type: the i32 boundary is fine. + let mut buf64 = OffsetBuffer::::default(); + buf64.try_push(b"hello", false).unwrap(); + assert!(!buf64.would_overflow(i32::MAX as usize - 4)); + assert!(!buf64.would_overflow(i32::MAX as usize)); + } + #[test] fn test_pad_nulls_empty() { let mut buffer = OffsetBuffer::::default(); From 1421fcd87b591ac0e45f34c98163827f12b9db0e Mon Sep 17 00:00:00 2001 From: Vignesh <149236000+vigneshsiva11@users.noreply.github.com> Date: Wed, 4 Mar 2026 15:44:57 +0000 Subject: [PATCH 2/2] style: fix cargo fmt for ByteArrayDecoderPlain::new call --- parquet/src/arrow/array_reader/byte_array.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 2e082a0e345b..69c116bf7678 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -696,12 +696,7 @@ mod tests { page.extend_from_slice(s); } - let mut decoder = ByteArrayDecoderPlain::new( - bytes::Bytes::from(page), - 3, - Some(3), - false, - ); + let mut decoder = ByteArrayDecoderPlain::new(bytes::Bytes::from(page), 3, Some(3), false); let mut output = OffsetBuffer::::default();