Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ impl ByteArrayDecoderDictionary {
return Ok(0);
}

// Pre-reserve offsets capacity to avoid per-chunk reallocation
output.offsets.reserve(len);

self.decoder.read(len, |keys| {
output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
})
Expand Down
99 changes: 35 additions & 64 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::utf8::check_valid_utf8;
use arrow_array::{ArrayRef, builder::make_view};
use arrow_buffer::Buffer;
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
Expand Down Expand Up @@ -475,74 +474,46 @@ impl ByteViewArrayDecoderDictionary {
return Ok(0);
}

// Check if the last few buffer of `output`` are the same as the `dict` buffer
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};
// Check if all dictionary views are inlined (len <= 12).
// If so, we can skip buffer management entirely since inline views
// don't reference any buffers.
let all_views_inlined = dict.views.iter().all(|&v| (v as u32) <= 12);

if !all_views_inlined {
// Check if the last few buffer of `output`` are the same as the `dict` buffer
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};

if need_to_create_new_buffer {
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
if need_to_create_new_buffer {
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
}
}
}

// Calculate the offset of the dictionary buffers in the output buffers
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;

let mut error = None;
let read = self.decoder.read(len, |keys| {
if base_buffer_idx == 0 {
// the dictionary buffers are the last buffers in output, we can directly use the views
output
.views
.extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
Some(&view) => view,
None => {
if error.is_none() {
error = Some(general_err!("invalid key={} for dictionary", *k));
}
0
}
}));
Ok(())
} else {
output
.views
.extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
Some(&view) => {
let len = view as u32;
if len <= 12 {
view
} else {
let mut view = ByteView::from(view);
view.buffer_index += base_buffer_idx;
view.into()
}
}
None => {
if error.is_none() {
error = Some(general_err!("invalid key={} for dictionary", *k));
}
0
}
}));
Ok(())
}
})?;
if let Some(e) = error {
return Err(e);
}
let base_buffer_idx = if all_views_inlined {
0
} else {
output.buffers.len() as u32 - dict.buffers.len() as u32
};

// Pre-reserve output capacity to avoid per-chunk reallocation in extend
output.views.reserve(len);

let read =
self.decoder
.read_gather_views(len, &dict.views, &mut output.views, base_buffer_idx)?;
Ok(read)
}

Expand Down
87 changes: 83 additions & 4 deletions parquet/src/arrow/decoder/dictionary_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use bytes::Bytes;

use crate::encodings::rle::RleDecoder;
use crate::encodings::rle::{RleDecodedBatch, RleDecoder};
use crate::errors::Result;

/// Decoder for `Encoding::RLE_DICTIONARY` indices
Expand All @@ -27,7 +27,7 @@ pub struct DictIndexDecoder {

/// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
/// offsets
index_buf: Box<[i32; 1024]>,
index_buf: Vec<i32>,
/// Current length of `index_buf`
index_buf_len: usize,
/// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
Expand All @@ -46,13 +46,14 @@ impl DictIndexDecoder {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.slice(1..))?;
let max_remaining = num_values.unwrap_or(num_levels);

Ok(Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf: vec![0; max_remaining.min(1024)],
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
max_remaining_values: max_remaining,
})
}

Expand Down Expand Up @@ -91,6 +92,84 @@ impl DictIndexDecoder {
Ok(values_read)
}

/// Decode indices and gather views directly into `output`.
///
/// For RLE runs, fills output with the repeated view directly (no index buffer needed).
/// For bit-packed runs, decodes indices to a stack-local buffer and gathers immediately.
pub fn read_gather_views(
&mut self,
len: usize,
dict_views: &[u128],
output: &mut Vec<u128>,
base_buffer_idx: u32,
) -> Result<usize> {
let to_read = len.min(self.max_remaining_values);
let mut values_read = 0;

// Flush any buffered indices from previous reads
if self.index_offset < self.index_buf_len {
let n = (self.index_buf_len - self.index_offset).min(to_read);
let keys = &self.index_buf[self.index_offset..self.index_offset + n];
Self::gather_views(keys, dict_views, output, base_buffer_idx);
self.index_offset += n;
self.max_remaining_values -= n;
values_read += n;
}

if values_read < to_read {
let read =
self.decoder
.get_batch_direct(to_read - values_read, |batch| match batch {
RleDecodedBatch::Rle { index, count } => {
let view = dict_views[index as usize];
let view = if base_buffer_idx == 0 || (view as u32) <= 12 {
view
} else {
let mut bv = arrow_data::ByteView::from(view);
bv.buffer_index += base_buffer_idx;
bv.into()
};
output.extend(std::iter::repeat_n(view, count));
}
RleDecodedBatch::BitPacked(keys) => {
Self::gather_views(keys, dict_views, output, base_buffer_idx);
}
})?;
self.max_remaining_values -= read;
values_read += read;
}

Ok(values_read)
}

fn gather_views(
keys: &[i32],
dict_views: &[u128],
output: &mut Vec<u128>,
base_buffer_idx: u32,
) {
// Clamp index to valid range to prevent UB on corrupt data.
// This is branchless (cmp+csel on ARM) and avoids bounds checks in the hot loop.
let max_idx = dict_views.len() - 1;
if base_buffer_idx == 0 {
output.extend(
keys.iter()
.map(|k| unsafe { *dict_views.get_unchecked((*k as usize).min(max_idx)) }),
);
} else {
output.extend(keys.iter().map(|k| {
let view = unsafe { *dict_views.get_unchecked((*k as usize).min(max_idx)) };
if (view as u32) <= 12 {
view
} else {
let mut bv = arrow_data::ByteView::from(view);
bv.buffer_index += base_buffer_idx;
bv.into()
}
}));
}
}

/// Skip up to `to_skip` values, returning the number of values skipped
pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
Expand Down
66 changes: 65 additions & 1 deletion parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ impl RleEncoder {
/// Size, in number of `i32s` of buffer to use for RLE batch reading
const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;

/// A decoded batch from [`RleDecoder::get_batch_direct`].
#[cfg(feature = "arrow")]
pub enum RleDecodedBatch<'a> {
/// An RLE run: all values are the same index, repeated `count` times
Rle { index: i32, count: usize },
/// A batch of bit-packed indices
BitPacked(&'a [i32]),
}

/// A RLE/Bit-Packing hybrid decoder.
pub struct RleDecoder {
// Number of bits used to encode the value. Must be between [0, 64].
Expand Down Expand Up @@ -414,6 +423,52 @@ impl RleDecoder {
Ok(values_read)
}

/// Decode up to `max_values` indices and call `f` with each decoded batch.
///
/// For RLE runs, provides [`RleDecodedBatch::Rle`] so callers can fill output directly.
/// For bit-packed runs, provides [`RleDecodedBatch::BitPacked`] with decoded indices.
#[cfg(feature = "arrow")]
pub fn get_batch_direct<F>(&mut self, max_values: usize, mut f: F) -> Result<usize>
where
F: FnMut(RleDecodedBatch<'_>),
{
let mut values_read = 0;
let mut index_buf = [0i32; 1024];
while values_read < max_values {
if self.rle_left > 0 {
let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
let idx = self.current_value.unwrap() as i32;
f(RleDecodedBatch::Rle {
index: idx,
count: num_values,
});
self.rle_left -= num_values as u32;
values_read += num_values;
} else if self.bit_packed_left > 0 {
let to_read = (max_values - values_read)
.min(self.bit_packed_left as usize)
.min(index_buf.len());
let bit_reader = self
.bit_reader
.as_mut()
.ok_or_else(|| general_err!("bit_reader should be set"))?;

let num_values =
bit_reader.get_batch::<i32>(&mut index_buf[..to_read], self.bit_width as usize);
if num_values == 0 {
self.bit_packed_left = 0;
continue;
}
f(RleDecodedBatch::BitPacked(&index_buf[..num_values]));
self.bit_packed_left -= num_values as u32;
values_read += num_values;
} else if !self.reload()? {
break;
}
}
Ok(values_read)
}

#[inline(never)]
pub fn skip(&mut self, num_values: usize) -> Result<usize> {
let mut values_skipped = 0;
Expand Down Expand Up @@ -458,6 +513,13 @@ impl RleDecoder {
{
assert!(buffer.len() >= max_values);

if dict.is_empty() {
return Ok(0);
}
// Clamp index to valid range to prevent UB on corrupt data.
// This is branchless (cmp+csel on ARM) and avoids bounds checks in the hot loop.
let max_idx = dict.len() - 1;

let mut values_read = 0;
while values_read < max_values {
let index_buf = self.index_buf.get_or_insert_with(|| Box::new([0; 1024]));
Expand Down Expand Up @@ -497,7 +559,9 @@ impl RleDecoder {
buffer[values_read..values_read + num_values]
.iter_mut()
.zip(index_buf[..num_values].iter())
.for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
.for_each(|(b, i)| unsafe {
b.clone_from(dict.get_unchecked((*i as usize).min(max_idx)));
});
self.bit_packed_left -= num_values as u32;
values_read += num_values;
if num_values < to_read {
Expand Down
Loading