diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 0acbe6501924..69c116bf7678 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -397,17 +397,26 @@ impl ByteArrayDecoderPlain { return Err(ParquetError::EOF("eof decoding byte array".into())); } + // Stop early rather than overflow the 32-bit offset buffer. + // self.offset is NOT advanced, so the next read() call resumes + // from this value in the following batch. + if output.would_overflow(len as usize) { + break; + } + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; self.offset = end_offset; read += 1; } - self.max_remaining_values -= to_read; + // Use actual reads, not the requested amount, so max_remaining_values + // stays correct when we stop early. + self.max_remaining_values -= read; if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; + output.check_valid_utf8(initial_values_length)? } - Ok(to_read) + Ok(read) } pub fn skip(&mut self, to_skip: usize) -> Result { @@ -489,22 +498,33 @@ impl ByteArrayDecoderDeltaLength { output.values.reserve(total_bytes); let mut current_offset = self.data_offset; + let mut read = 0; for length in src_lengths { - let end_offset = current_offset + *length as usize; + let value_len = *length as usize; + let end_offset = current_offset + value_len; + + // Stop early rather than overflow the 32-bit offset buffer. + // length_offset / data_offset are only advanced by what was + // actually consumed, so the next read() resumes correctly. + if output.would_overflow(value_len) { + break; + } + output.try_push( &self.data.as_ref()[current_offset..end_offset], self.validate_utf8, )?; current_offset = end_offset; + read += 1; } self.data_offset = current_offset; - self.length_offset += to_read; + self.length_offset += read; if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; + output.check_valid_utf8(initial_values_length)? } - Ok(to_read) + Ok(read) } fn skip(&mut self, to_skip: usize) -> Result { @@ -542,13 +562,35 @@ impl ByteArrayDecoderDelta { let initial_values_length = output.values.len(); output.offsets.reserve(len.min(self.decoder.remaining())); - let read = self - .decoder - .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; - - if self.validate_utf8 { - output.check_valid_utf8(initial_values_length)?; + let mut read = 0; + let mut overflow = false; + let validate_utf8 = self.validate_utf8; + + let result = self.decoder.read(len, |bytes| { + // Stop early rather than overflow the 32-bit offset buffer. + // Returning Err leaves the DeltaByteArrayDecoder positioned at + // this value, so the next read() resumes correctly. + if output.would_overflow(bytes.len()) { + overflow = true; + return Err(general_err!("index overflow decoding byte array")); + } + output.try_push(bytes, validate_utf8)?; + read += 1; + Ok(()) + }); + + match result { + Ok(_) => { + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + } + // Overflow is expected – decoder is correctly positioned at the + // value that didn't fit, ready for the next batch. + Err(_) if overflow => {} + Err(e) => return Err(e), } + Ok(read) } @@ -580,9 +622,45 @@ impl ByteArrayDecoderDictionary { return Ok(0); } - self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice()) - }) + let dict_offsets = dict.offsets.as_slice(); + let dict_values = dict.values.as_slice(); + let mut total_read = 0; + + // Process one key at a time so we can stop cleanly when the output + // buffer would overflow. When the closure returns Err the + // DictIndexDecoder does NOT advance its position, so the same key will + // be retried in the next batch – no values are lost or skipped. + while total_read < len { + let mut overflow = false; + let n = self.decoder.read(1, |keys| { + let key = keys[0] as usize; + if key + 1 >= dict_offsets.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_offsets.len().saturating_sub(1) + )); + } + let start = dict_offsets[key].as_usize(); + let end = dict_offsets[key + 1].as_usize(); + let value = &dict_values[start..end]; + + if output.would_overflow(value.len()) { + overflow = true; + return Err(general_err!("index overflow decoding byte array")); + } + // Dictionary values were validated at dictionary-page decode time. + output.try_push(value, false) + }); + + match n { + Ok(0) => break, // no more values in this page + Ok(_) => total_read += 1, // successfully pushed one key + Err(_) if overflow => break, // would overflow – stop for this batch + Err(e) => return Err(e), // real error + } + } + + Ok(total_read) } fn skip( @@ -607,6 +685,38 @@ mod tests { use arrow_array::{Array, StringArray}; use arrow_buffer::Buffer; + /// Verify that `ByteArrayDecoderPlain` transparently splits a page across + /// multiple `read()` calls when asked for fewer values than the page holds, + /// and that no values are lost between calls. + #[test] + fn test_plain_decoder_partial_read() { + let mut page: Vec = Vec::new(); + for s in [b"foo" as &[u8], b"bar", b"baz"] { + page.extend_from_slice(&(s.len() as u32).to_le_bytes()); + page.extend_from_slice(s); + } + + let mut decoder = ByteArrayDecoderPlain::new(bytes::Bytes::from(page), 3, Some(3), false); + + let mut output = OffsetBuffer::::default(); + + // First read: ask for 2 out of 3 values. + let n = decoder.read(&mut output, 2).unwrap(); + assert_eq!(n, 2); + assert_eq!(&output.values, b"foobar"); + assert_eq!(output.offsets.as_slice(), &[0, 3, 6]); + + // Second read: gets the remaining value. + let n2 = decoder.read(&mut output, 2).unwrap(); + assert_eq!(n2, 1); + assert_eq!(&output.values, b"foobarbaz"); + assert_eq!(output.offsets.as_slice(), &[0, 3, 6, 9]); + + // No more values. + let n3 = decoder.read(&mut output, 1).unwrap(); + assert_eq!(n3, 0); + } + #[test] fn test_byte_array_decoder() { let (pages, encoded_dictionary) = diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 209ed4e5c15f..f648ddb5d6aa 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -53,6 +53,19 @@ impl OffsetBuffer { self.len() == 0 } + /// Returns `true` if appending `data_len` bytes would overflow the offset type `I`. + /// + /// Used by decoders to stop filling a batch early rather than returning an error, + /// allowing the remaining values to be emitted in a subsequent batch. + #[inline] + pub fn would_overflow(&self, data_len: usize) -> bool { + // Use checked_add to handle the case where the sum itself overflows usize. + match self.values.len().checked_add(data_len) { + Some(total) => I::from_usize(total).is_none(), + None => true, // usize addition overflowed → definitely can't fit + } + } + /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -318,6 +331,30 @@ mod tests { buffer.check_valid_utf8(12).unwrap_err(); } + #[test] + fn test_would_overflow() { + // Buffer with 5 bytes already written. + let mut buf = OffsetBuffer::::default(); + buf.try_push(b"hello", false).unwrap(); // values.len() == 5 + + // Within i32::MAX – should not report overflow. + assert!(!buf.would_overflow(0)); + assert!(!buf.would_overflow(1)); + // 5 + (i32::MAX - 5) == i32::MAX, still representable. + assert!(!buf.would_overflow(i32::MAX as usize - 5)); + // 5 + (i32::MAX - 4) == i32::MAX + 1, overflows i32. + assert!(buf.would_overflow(i32::MAX as usize - 4)); + assert!(buf.would_overflow(i32::MAX as usize)); + // usize::MAX must be caught without panicking. + assert!(buf.would_overflow(usize::MAX)); + + // i64 offset type: the i32 boundary is fine. + let mut buf64 = OffsetBuffer::::default(); + buf64.try_push(b"hello", false).unwrap(); + assert!(!buf64.would_overflow(i32::MAX as usize - 4)); + assert!(!buf64.would_overflow(i32::MAX as usize)); + } + #[test] fn test_pad_nulls_empty() { let mut buffer = OffsetBuffer::::default();