diff --git a/encodings/fastlanes/public-api.lock b/encodings/fastlanes/public-api.lock index 59fc2d85f1b..2533a7d5667 100644 --- a/encodings/fastlanes/public-api.lock +++ b/encodings/fastlanes/public-api.lock @@ -26,7 +26,7 @@ pub fn vortex_fastlanes::bitpack_compress::bitpack_primitive vortex_error::VortexResult -pub unsafe fn vortex_fastlanes::bitpack_compress::bitpack_unchecked(parray: &vortex_array::arrays::primitive::array::PrimitiveArray, bit_width: u8) -> vortex_error::VortexResult +pub unsafe fn vortex_fastlanes::bitpack_compress::bitpack_unchecked(parray: &vortex_array::arrays::primitive::array::PrimitiveArray, bit_width: u8) -> vortex_buffer::ByteBuffer pub fn vortex_fastlanes::bitpack_compress::find_best_bit_width(ptype: vortex_array::dtype::ptype::PType, bit_width_freq: &[usize]) -> vortex_error::VortexResult @@ -48,10 +48,6 @@ pub fn vortex_fastlanes::bitpack_decompress::unpack_single(array: &vortex_fastla pub unsafe fn vortex_fastlanes::bitpack_decompress::unpack_single_primitive(packed: &[T], bit_width: usize, index_to_decode: usize) -> T -pub fn vortex_fastlanes::bitpack_decompress::unpack_to_primitive(array: &vortex_fastlanes::BitPackedArray) -> vortex_array::arrays::primitive::array::PrimitiveArray - -pub fn vortex_fastlanes::bitpack_decompress::unpack_to_primitive_typed(array: &vortex_fastlanes::BitPackedArray) -> vortex_array::arrays::primitive::array::PrimitiveArray - pub mod vortex_fastlanes::unpack_iter pub struct vortex_fastlanes::unpack_iter::BitPackingStrategy @@ -372,11 +368,9 @@ pub fn vortex_fastlanes::DeltaArray::len(&self) -> usize pub fn vortex_fastlanes::DeltaArray::offset(&self) -> usize -pub fn vortex_fastlanes::DeltaArray::try_from_delta_compress_parts(bases: vortex_array::array::ArrayRef, deltas: vortex_array::array::ArrayRef) -> vortex_error::VortexResult - pub fn vortex_fastlanes::DeltaArray::try_from_primitive_array(array: &vortex_array::arrays::primitive::array::PrimitiveArray, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult -pub fn vortex_fastlanes::DeltaArray::try_new(bases: vortex_array::array::ArrayRef, deltas: vortex_array::array::ArrayRef, offset: usize, logical_len: usize) -> vortex_error::VortexResult +pub fn vortex_fastlanes::DeltaArray::try_new(bases: vortex_array::array::ArrayRef, deltas: vortex_array::array::ArrayRef, offset: usize, len: usize) -> vortex_error::VortexResult impl vortex_fastlanes::DeltaArray diff --git a/encodings/fastlanes/src/bit_transpose/validity.rs b/encodings/fastlanes/src/bit_transpose/validity.rs index 13f1c6b0385..826e1a2ad35 100644 --- a/encodings/fastlanes/src/bit_transpose/validity.rs +++ b/encodings/fastlanes/src/bit_transpose/validity.rs @@ -37,7 +37,6 @@ pub fn transpose_validity(validity: &Validity, ctx: &mut ExecutionCtx) -> Vortex } } -#[inline] pub fn transpose_bitbuffer(bits: BitBuffer) -> BitBuffer { let (offset, len, bytes) = bits.into_inner(); @@ -79,11 +78,10 @@ pub fn untranspose_validity(validity: &Validity, ctx: &mut ExecutionCtx) -> Vort } } -#[inline] pub fn untranspose_bitbuffer(bits: BitBuffer) -> BitBuffer { assert!( bits.inner().len().is_multiple_of(128), - "Transpose BitBuffer must be 128-byte aligned" + "Transpose BitBuffer byte length must be a multiple of 128" ); let (offset, len, bytes) = bits.into_inner(); match bytes.try_into_mut() { @@ -137,3 +135,58 @@ fn bits_op_with_copy( offset, ) } + +#[cfg(test)] +mod tests { + use vortex_buffer::BitBuffer; + use vortex_buffer::BitBufferMut; + use vortex_buffer::ByteBuffer; + + use super::*; + + fn make_validity_bits(num_bits: usize) -> BitBuffer { + let mut builder = BitBufferMut::with_capacity(num_bits); + for i in 0..num_bits { + builder.append(i % 3 != 0); + } + builder.freeze() + } + + fn force_copy_path(bits: BitBuffer) -> (BitBuffer, ByteBuffer) { + let (offset, len, bytes) = bits.into_inner(); + let extra_ref = bytes.clone(); + (BitBuffer::new_with_offset(bytes, len, offset), extra_ref) + } + + #[test] + fn transpose_padding_copy_produces_same_bits() { + let bits = make_validity_bits(500); + let transposed = transpose_bitbuffer(bits.clone()); + assert_eq!(transposed.len(), 1024); + let untransposed = untranspose_bitbuffer(transposed); + assert_eq!(untransposed.slice(0..500), bits) + } + + #[test] + fn transpose_inplace_and_copy_produce_same_bits() { + let bits = make_validity_bits(2048); + + let inplace_result = transpose_bitbuffer(bits.clone()); + + let (bits_shared, _hold) = force_copy_path(bits); + let copy_result = transpose_bitbuffer(bits_shared); + + assert_eq!(inplace_result.len(), copy_result.len()); + assert_eq!(inplace_result, copy_result); + } + + #[test] + fn transpose_validity_roundtrip_non_aligned() { + let original_len = 1500; + let bits = make_validity_bits(original_len); + + let transposed = transpose_bitbuffer(bits.clone()); + let roundtripped = untranspose_bitbuffer(transposed); + assert_eq!(bits, roundtripped.slice(0..original_len)); + } +} diff --git a/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs b/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs index a3542d1d6a0..6f29a72db0c 100644 --- a/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs +++ b/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs @@ -65,7 +65,7 @@ pub fn bitpack_encode( } // SAFETY: we check that array only contains non-negative values. - let packed = unsafe { bitpack_unchecked(array, bit_width)? }; + let packed = unsafe { bitpack_unchecked(array, bit_width) }; let patches = (num_exceptions > 0) .then(|| gather_patches(array, bit_width, num_exceptions)) .transpose()? @@ -103,7 +103,7 @@ pub unsafe fn bitpack_encode_unchecked( bit_width: u8, ) -> VortexResult { // SAFETY: non-negativity of input checked by caller. - let packed = unsafe { bitpack_unchecked(&array, bit_width)? }; + let packed = unsafe { bitpack_unchecked(&array, bit_width) }; // SAFETY: checked by bitpack_unchecked let bitpacked = unsafe { @@ -135,15 +135,11 @@ pub unsafe fn bitpack_encode_unchecked( /// /// It is the caller's responsibility to ensure that `parray` is non-negative before calling /// this function. -pub unsafe fn bitpack_unchecked( - parray: &PrimitiveArray, - bit_width: u8, -) -> VortexResult { +pub unsafe fn bitpack_unchecked(parray: &PrimitiveArray, bit_width: u8) -> ByteBuffer { let parray = parray.reinterpret_cast(parray.ptype().to_unsigned()); - let packed = match_each_unsigned_integer_ptype!(parray.ptype(), |P| { + match_each_unsigned_integer_ptype!(parray.ptype(), |P| { bitpack_primitive(parray.as_slice::

(), bit_width).into_byte_buffer() - }); - Ok(packed) + }) } /// Bitpack a slice of primitives down to the given width. diff --git a/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs b/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs index 392c61bcf4c..e4099cdcf24 100644 --- a/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs +++ b/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs @@ -3,86 +3,24 @@ use fastlanes::BitPacking; use itertools::Itertools; +use num_traits::AsPrimitive; use vortex_array::ExecutionCtx; -use vortex_array::LEGACY_SESSION; -use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::builders::ArrayBuilder; use vortex_array::builders::PrimitiveBuilder; use vortex_array::builders::UninitRange; -use vortex_array::dtype::IntegerPType; use vortex_array::dtype::NativePType; -use vortex_array::dtype::Nullability; use vortex_array::match_each_integer_ptype; use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::patches::Patches; use vortex_array::scalar::Scalar; -use vortex_array::validity::Validity; -use vortex_buffer::BufferMut; use vortex_error::VortexExpect; use vortex_error::VortexResult; -use vortex_error::vortex_panic; -use vortex_mask::Mask; use crate::BitPackedArray; use crate::unpack_iter::BitPacked; /// Unpacks a bit-packed array into a primitive array. -pub fn unpack_to_primitive(array: &BitPackedArray) -> PrimitiveArray { - match_each_integer_ptype!(array.ptype(), |P| { unpack_to_primitive_typed::

(array) }) -} - -/// Unpacks a bit-packed array into a typed [`PrimitiveArray`]. -pub fn unpack_to_primitive_typed(array: &BitPackedArray) -> PrimitiveArray { - if array.is_empty() { - let nullability = array.dtype().nullability(); - return PrimitiveArray::empty::

(nullability); - } - - let len = array.len(); - let mut elements = BufferMut::

::with_capacity(len); - let uninit_slice = &mut elements.spare_capacity_mut()[..len]; - - // Decode into an uninitialized slice. - let mut bit_packed_iter = array.unpacked_chunks(); - bit_packed_iter.decode_into(uninit_slice); - // SAFETY: `decode_into` initialized exactly `len` elements into the spare (existing) capacity. - unsafe { elements.set_len(len) }; - - let mut validity = array - .validity_mask() - .vortex_expect("validity_mask") - .into_mut(); - debug_assert_eq!(validity.len(), len); - - // TODO(connor): Implement a fused version of patching instead. - if let Some(patches) = array.patches() { - // SAFETY: - // - `Patches` invariant guarantees indices are sorted and within array bounds. - // - `elements` and `validity` have equal length (both are `len` from the array). - // - All patch indices are valid after offset adjustment (guaranteed by `Patches`). - unsafe { - patches.apply_to_buffer( - &mut elements, - &mut validity, - &mut LEGACY_SESSION.create_execution_ctx(), - ) - }; - } - - // Convert MaskMut -> Mask -> Validity - let validity_mask = validity.freeze(); - let nullability = array.dtype().nullability(); - let validity = if nullability == Nullability::NonNullable { - Validity::NonNullable - } else { - Validity::from_mask(validity_mask, nullability) - }; - - // SAFETY: `elements` buffer has the correct length matching validity. - unsafe { PrimitiveArray::new_unchecked(elements.freeze(), validity) } -} - pub fn unpack_array( array: &BitPackedArray, ctx: &mut ExecutionCtx, @@ -157,43 +95,20 @@ pub fn apply_patches_to_uninit_range_fn T>( let indices = patches.indices().clone().execute::(ctx)?; let values = patches.values().clone().execute::(ctx)?; - let validity = values.validity_mask()?; + assert!(values.all_valid()?, "Patch values must be all valid"); let values = values.as_slice::(); match_each_unsigned_integer_ptype!(indices.ptype(), |P| { - insert_values_and_validity_at_indices_to_uninit_range( - dst, - indices.as_slice::

(), - values, - validity, - patches.offset(), - f, - ) + for (index, &value) in indices.as_slice::

().iter().zip_eq(values) { + dst.set_value( +

>::as_(*index) - patches.offset(), + f(value), + ); + } }); Ok(()) } -fn insert_values_and_validity_at_indices_to_uninit_range< - T: NativePType, - IndexT: IntegerPType, - F: Fn(T) -> T, ->( - dst: &mut UninitRange, - indices: &[IndexT], - values: &[T], - values_validity: Mask, - indices_offset: usize, - f: F, -) { - let Mask::AllTrue(_) = values_validity else { - vortex_panic!("BitPackedArray somehow had nullable patch values"); - }; - - for (index, &value) in indices.iter().zip_eq(values) { - dst.set_value(index.as_() - indices_offset, f(value)); - } -} - pub fn unpack_single(array: &BitPackedArray, index: usize) -> Scalar { let bit_width = array.bit_width() as usize; let ptype = array.ptype(); @@ -481,68 +396,21 @@ mod tests { Ok(()) } - /// Test basic unpacking to primitive array for multiple types and sizes. - #[test] - fn test_unpack_to_primitive_basic() -> VortexResult<()> { - // Test with u8 values. - let u8_values = PrimitiveArray::from_iter([5u8, 10, 15, 20, 25]); - let u8_bitpacked = bitpack_encode(&u8_values, 5, None).unwrap(); - let u8_result = unpack_to_primitive(&u8_bitpacked); - // Compare with existing unpack method. - let expected = unpack_array(&u8_bitpacked, &mut SESSION.create_execution_ctx())?; - assert_eq!(u8_result.len(), expected.len()); - assert_arrays_eq!(u8_result, expected); - - // Test with u32 values - empty array. - let u32_empty: PrimitiveArray = PrimitiveArray::from_iter(Vec::::new()); - let u32_empty_bp = bitpack_encode(&u32_empty, 0, None).unwrap(); - let u32_empty_result = unpack_to_primitive(&u32_empty_bp); - assert_eq!(u32_empty_result.len(), 0); - - // Test with u16 values - exactly one chunk (1024 elements). - let u16_values = PrimitiveArray::from_iter(0u16..1024); - let u16_bitpacked = bitpack_encode(&u16_values, 10, None).unwrap(); - let u16_result = unpack_to_primitive(&u16_bitpacked); - assert_eq!(u16_result.len(), 1024); - - // Test with i32 values - partial chunk (1025 elements). - let i32_values = PrimitiveArray::from_iter((0i32..1025).map(|x| x % 512)); - let i32_bitpacked = bitpack_encode(&i32_values, 9, None).unwrap(); - let i32_result = unpack_to_primitive(&i32_bitpacked); - assert_eq!(i32_result.len(), 1025); - - // Verify consistency: unpack_to_primitive and unpack_array should produce same values. - let i32_array = unpack_array(&i32_bitpacked, &mut SESSION.create_execution_ctx())?; - assert_eq!(i32_result.len(), i32_array.len()); - assert_arrays_eq!(i32_result, i32_array); - Ok(()) - } - /// Test unpacking with patches at various positions. #[test] fn test_unpack_to_primitive_with_patches() -> VortexResult<()> { // Create an array where patches are needed at start, middle, and end. - let values: Vec = vec![ - 2000, // Patch at start + let values = buffer![ + 2000u32, // Patch at start 5, 10, 15, 20, 25, 30, 3000, // Patch in middle 35, 40, 45, 50, 55, 4000, // Patch at end ]; - let array = PrimitiveArray::from_iter(values.clone()); + let array = PrimitiveArray::new(values, Validity::NonNullable); // Bitpack with a small bit width to force patches. let bitpacked = bitpack_encode(&array, 6, None).unwrap(); assert!(bitpacked.patches().is_some(), "Should have patches"); - // Unpack to primitive array. - let result = unpack_to_primitive(&bitpacked); - - // Verify length and that patches were applied. - assert_eq!(result.len(), values.len()); - // The result should match what unpack_array also produces. - let expected = unpack_array(&bitpacked, &mut SESSION.create_execution_ctx())?; - assert_eq!(result.len(), expected.len()); - assert_arrays_eq!(result, expected); - // Test with a larger array with multiple patches across chunks. let large_values: Vec = (0..3072) .map(|i| { @@ -557,7 +425,7 @@ mod tests { let large_bitpacked = bitpack_encode(&large_array, 8, None).unwrap(); assert!(large_bitpacked.patches().is_some()); - let large_result = unpack_to_primitive(&large_bitpacked); + let large_result = unpack_array(&large_bitpacked, &mut SESSION.create_execution_ctx())?; assert_eq!(large_result.len(), 3072); Ok(()) } @@ -571,7 +439,8 @@ mod tests { let array = PrimitiveArray::new(values, validity); let bitpacked = bitpack_encode(&array, 9, None).unwrap(); - let result = unpack_to_primitive(&bitpacked); + let result = + unpack_array(&bitpacked, &mut SESSION.create_execution_ctx()).vortex_expect("unpack"); // Verify length. assert_eq!(result.len(), 7); @@ -588,7 +457,8 @@ mod tests { let patch_bitpacked = bitpack_encode(&patch_array, 5, None).unwrap(); assert!(patch_bitpacked.patches().is_some()); - let patch_result = unpack_to_primitive(&patch_bitpacked); + let patch_result = unpack_array(&patch_bitpacked, &mut SESSION.create_execution_ctx()) + .vortex_expect("unpack"); assert_eq!(patch_result.len(), 7); // Test all nulls edge case. @@ -597,7 +467,8 @@ mod tests { Validity::from_iter([false, false, false, false]), ); let all_nulls_bp = bitpack_encode(&all_nulls, 0, None).unwrap(); - let all_nulls_result = unpack_to_primitive(&all_nulls_bp); + let all_nulls_result = unpack_array(&all_nulls_bp, &mut SESSION.create_execution_ctx()) + .vortex_expect("unpack"); assert_eq!(all_nulls_result.len(), 4); } @@ -608,13 +479,8 @@ mod tests { let test_consistency = |array: &PrimitiveArray, bit_width: u8| -> VortexResult<()> { let bitpacked = bitpack_encode(array, bit_width, None).unwrap(); - // Method 1: Using the new unpack_to_primitive. - let primitive_result = unpack_to_primitive(&bitpacked); - - // Method 2: Using the old unpack_array. let unpacked_array = unpack_array(&bitpacked, &mut SESSION.create_execution_ctx())?; - // Method 3: Using the execute() method (this is what would be used in production). let executed = { let mut ctx = SESSION.create_execution_ctx(); bitpacked @@ -623,12 +489,6 @@ mod tests { .unwrap() }; - // All three should produce the same length. - assert_eq!( - primitive_result.len(), - array.len(), - "primitive length mismatch" - ); assert_eq!( unpacked_array.len(), array.len(), @@ -723,13 +583,13 @@ mod tests { // Empty array. let empty: PrimitiveArray = PrimitiveArray::from_iter(Vec::::new()); let empty_bp = bitpack_encode(&empty, 0, None).unwrap(); - let empty_result = unpack_to_primitive(&empty_bp); + let empty_result = unpack_array(&empty_bp, &mut SESSION.create_execution_ctx())?; assert_eq!(empty_result.len(), 0); // All zeros (bit_width = 0). let zeros = PrimitiveArray::from_iter([0u32; 100]); let zeros_bp = bitpack_encode(&zeros, 0, None).unwrap(); - let zeros_result = unpack_to_primitive(&zeros_bp); + let zeros_result = unpack_array(&zeros_bp, &mut SESSION.create_execution_ctx())?; assert_eq!(zeros_result.len(), 100); // Verify consistency with unpack_array. let zeros_array = unpack_array(&zeros_bp, &mut SESSION.create_execution_ctx())?; @@ -739,7 +599,7 @@ mod tests { // Maximum bit width for u16 (15 bits, since bitpacking requires bit_width < type bit width). let max_values = PrimitiveArray::from_iter([32767u16; 50]); // 2^15 - 1 let max_bp = bitpack_encode(&max_values, 15, None).unwrap(); - let max_result = unpack_to_primitive(&max_bp); + let max_result = unpack_array(&max_bp, &mut SESSION.create_execution_ctx())?; assert_eq!(max_result.len(), 50); // Exactly 3072 elements with patches across chunks. @@ -756,7 +616,7 @@ mod tests { let boundary_bp = bitpack_encode(&boundary_array, 7, None).unwrap(); assert!(boundary_bp.patches().is_some()); - let boundary_result = unpack_to_primitive(&boundary_bp); + let boundary_result = unpack_array(&boundary_bp, &mut SESSION.create_execution_ctx())?; assert_eq!(boundary_result.len(), 3072); // Verify consistency. let boundary_unpacked = unpack_array(&boundary_bp, &mut SESSION.create_execution_ctx())?; @@ -766,7 +626,7 @@ mod tests { // Single element. let single = PrimitiveArray::from_iter([42u8]); let single_bp = bitpack_encode(&single, 6, None).unwrap(); - let single_result = unpack_to_primitive(&single_bp); + let single_result = unpack_array(&single_bp, &mut SESSION.create_execution_ctx())?; assert_eq!(single_result.len(), 1); Ok(()) } diff --git a/encodings/fastlanes/src/delta/array/delta_compress.rs b/encodings/fastlanes/src/delta/array/delta_compress.rs index 8a49d1a7bc8..94862ff26e6 100644 --- a/encodings/fastlanes/src/delta/array/delta_compress.rs +++ b/encodings/fastlanes/src/delta/array/delta_compress.rs @@ -16,6 +16,7 @@ use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_error::VortexResult; +use crate::FL_CHUNK_SIZE; use crate::bit_transpose::transpose_validity; use crate::fill_forward_nulls; @@ -40,53 +41,46 @@ pub fn delta_compress( Ok((bases, deltas)) } -fn compress_primitive( - array: &[T], -) -> (Buffer, Buffer) { - let padded_len = array.len().next_multiple_of(1024); - let num_chunks = padded_len / 1024; - let bases_len = num_chunks * LANES; +fn compress_primitive(array: &[T]) -> (Buffer, Buffer) +where + T: NativePType + Delta + Transpose, +{ + let padded_len = array.len().next_multiple_of(FL_CHUNK_SIZE); + let bases_len = (padded_len / FL_CHUNK_SIZE) * LANES; // Split into full 1024-element chunks and a remainder. - let (full_chunks, remainder) = array.as_chunks::<1024>(); + let (full_chunks, remainder) = array.as_chunks::(); // Allocate result arrays. let mut bases = BufferMut::with_capacity(bases_len); let mut deltas = BufferMut::with_capacity(padded_len); - let (output_deltas, _) = deltas.spare_capacity_mut().as_chunks_mut::<1024>(); + let (output_deltas, _) = deltas.spare_capacity_mut().as_chunks_mut::(); // Loop over all full 1024-element chunks. - let mut transposed: [T; 1024] = [T::default(); 1024]; - for (chunk, output) in full_chunks.iter().zip(output_deltas.iter_mut()) { - Transpose::transpose(chunk, &mut transposed); + let mut transposed: [T; FL_CHUNK_SIZE] = [T::default(); FL_CHUNK_SIZE]; + let mut process_chunk = |input: &[T; FL_CHUNK_SIZE], output: &mut [MaybeUninit; 1024]| { + Transpose::transpose(input, &mut transposed); bases.extend_from_slice(&transposed[0..T::LANES]); unsafe { Delta::delta::( &transposed, &*(transposed[0..T::LANES].as_ptr().cast()), - mem::transmute::<&mut [MaybeUninit; 1024], &mut [T; 1024]>(output), + mem::transmute::<&mut [MaybeUninit; FL_CHUNK_SIZE], &mut [T; FL_CHUNK_SIZE]>( + output, + ), ); } + }; + for (chunk, output) in full_chunks.iter().zip(output_deltas.iter_mut()) { + process_chunk(chunk, output); } // Pad the remainder to 1024 elements and process as a full chunk. if !remainder.is_empty() { - let mut padded_chunk = [T::default(); 1024]; + let mut padded_chunk = [T::default(); FL_CHUNK_SIZE]; padded_chunk[..remainder.len()].copy_from_slice(remainder); - - Transpose::transpose(&padded_chunk, &mut transposed); - bases.extend_from_slice(&transposed[0..T::LANES]); - - unsafe { - Delta::delta::( - &transposed, - &*(transposed[0..T::LANES].as_ptr().cast()), - mem::transmute::<&mut [MaybeUninit; 1024], &mut [T; 1024]>( - &mut output_deltas[full_chunks.len()], - ), - ); - } + process_chunk(&padded_chunk, &mut output_deltas[full_chunks.len()]); } unsafe { deltas.set_len(padded_len) }; @@ -102,6 +96,8 @@ mod tests { use std::sync::LazyLock; use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::ToCanonical; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; @@ -110,7 +106,9 @@ mod tests { use vortex_session::VortexSession; use crate::DeltaArray; + use crate::bitpack_compress::bitpack_encode; use crate::delta::array::delta_decompress::delta_decompress; + use crate::delta_compress; static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().with::()); @@ -135,12 +133,6 @@ mod tests { /// sum propagates corrupted values from null positions. #[test] fn delta_bitpacked_trailing_nulls() { - use vortex_array::IntoArray; - use vortex_array::ToCanonical; - - use crate::bitpack_compress::bitpack_encode; - use crate::delta_compress; - let array = PrimitiveArray::from_option_iter( (0u8..200).map(|i| (!(50..100).contains(&i)).then_some(i)), ); diff --git a/encodings/fastlanes/src/delta/array/mod.rs b/encodings/fastlanes/src/delta/array/mod.rs index 0cefe4c9fa9..1b98d97522e 100644 --- a/encodings/fastlanes/src/delta/array/mod.rs +++ b/encodings/fastlanes/src/delta/array/mod.rs @@ -77,23 +77,18 @@ impl DeltaArray { Self::try_new(bases.into_array(), deltas.into_array(), 0, logical_len) } - /// Create a [`DeltaArray`] from the given `bases` and `deltas` arrays. - /// Note the `deltas` might be nullable - pub fn try_from_delta_compress_parts(bases: ArrayRef, deltas: ArrayRef) -> VortexResult { - let logical_len = deltas.len(); - Self::try_new(bases, deltas, 0, logical_len) - } - + /// Create a [`DeltaArray`] from the given `bases` and `deltas` arrays + /// with given `offset` into first chunk and `logical_len` length. pub fn try_new( bases: ArrayRef, deltas: ArrayRef, offset: usize, - logical_len: usize, + len: usize, ) -> VortexResult { vortex_ensure!(offset < 1024, "offset must be less than 1024: {offset}"); vortex_ensure!( - offset + logical_len <= deltas.len(), - "offset + logical_len, {offset} + {logical_len}, must be less than or equal to the size of deltas: {}", + offset + len <= deltas.len(), + "offset + len, {offset} + {len}, must be less than or equal to the size of deltas: {}", deltas.len() ); vortex_ensure!( @@ -123,7 +118,7 @@ impl DeltaArray { ); // SAFETY: validation done above - Ok(unsafe { Self::new_unchecked(bases, deltas, offset, logical_len) }) + Ok(unsafe { Self::new_unchecked(bases, deltas, offset, len) }) } pub(crate) unsafe fn new_unchecked( diff --git a/encodings/fastlanes/src/rle/array/rle_compress.rs b/encodings/fastlanes/src/rle/array/rle_compress.rs index 8eec282e844..51cf30c447e 100644 --- a/encodings/fastlanes/src/rle/array/rle_compress.rs +++ b/encodings/fastlanes/src/rle/array/rle_compress.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use arrayref::array_mut_ref; -use arrayref::array_ref; use fastlanes::RLE; use vortex_array::IntoArray; use vortex_array::ToCanonical; @@ -18,6 +17,7 @@ use vortex_error::VortexResult; use crate::FL_CHUNK_SIZE; use crate::RLEArray; +use crate::fill_forward_nulls; impl RLEArray { /// Encodes a primitive array of unsigned integers using FastLanes RLE. @@ -34,7 +34,9 @@ where T: NativePType + RLE, NativeValue: RLE, { - let values = array.as_slice::(); + // Fill-forward null values so the RLE encoder doesn't see garbage at null positions, + // which would create spurious run boundaries and inflate the dictionary. + let values = fill_forward_nulls(array.to_buffer::(), array.validity()); let len = values.len(); let padded_len = len.next_multiple_of(FL_CHUNK_SIZE); @@ -49,7 +51,7 @@ where let indices_uninit = indices_buf.spare_capacity_mut(); let mut value_count_acc = 0; // Chunk value count prefix sum. - let mut chunks = values.chunks_exact(FL_CHUNK_SIZE); + let (chunks, remainder) = values.as_chunks::(); let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| { // SAFETY: NativeValue is repr(transparent) @@ -76,14 +78,10 @@ where value_count_acc += value_count; }; - for (chunk_idx, chunk_slice) in chunks.by_ref().enumerate() { - process_chunk( - chunk_idx * FL_CHUNK_SIZE, - array_ref![chunk_slice, 0, FL_CHUNK_SIZE], - ); + for (chunk_idx, chunk_slice) in chunks.iter().enumerate() { + process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice); } - let remainder = chunks.remainder(); if !remainder.is_empty() { // Repeat the last value for padding to prevent // accounting for an additional value change. @@ -142,31 +140,32 @@ mod tests { use vortex_array::assert_arrays_eq; use vortex_array::dtype::half::f16; use vortex_buffer::Buffer; + use vortex_buffer::buffer; use super::*; #[test] fn test_encode_decode() { // u8 - let values_u8: Buffer = [1, 1, 2, 2, 3, 3].iter().copied().collect(); - let array_u8 = values_u8.into_array(); - let encoded_u8 = RLEArray::encode(&array_u8.to_primitive()).unwrap(); + let array_u8: Buffer = buffer![1, 1, 2, 2, 3, 3]; + let encoded_u8 = + RLEArray::encode(&PrimitiveArray::new(array_u8, Validity::NonNullable)).unwrap(); let decoded_u8 = encoded_u8.to_primitive(); let expected_u8 = PrimitiveArray::from_iter(vec![1u8, 1, 2, 2, 3, 3]); assert_arrays_eq!(decoded_u8, expected_u8); // u16 - let values_u16: Buffer = [100, 100, 200, 200].iter().copied().collect(); - let array_u16 = values_u16.into_array(); - let encoded_u16 = RLEArray::encode(&array_u16.to_primitive()).unwrap(); + let array_u16: Buffer = buffer![100, 100, 200, 200]; + let encoded_u16 = + RLEArray::encode(&PrimitiveArray::new(array_u16, Validity::NonNullable)).unwrap(); let decoded_u16 = encoded_u16.to_primitive(); let expected_u16 = PrimitiveArray::from_iter(vec![100u16, 100, 200, 200]); assert_arrays_eq!(decoded_u16, expected_u16); // u64 - let values_u64: Buffer = [1000, 1000, 2000].iter().copied().collect(); - let array_u64 = values_u64.into_array(); - let encoded_u64 = RLEArray::encode(&array_u64.to_primitive()).unwrap(); + let array_u64: Buffer = buffer![1000, 1000, 2000]; + let encoded_u64 = + RLEArray::encode(&PrimitiveArray::new(array_u64, Validity::NonNullable)).unwrap(); let decoded_u64 = encoded_u64.to_primitive(); let expected_u64 = PrimitiveArray::from_iter(vec![1000u64, 1000, 2000]); assert_arrays_eq!(decoded_u64, expected_u64); @@ -174,17 +173,17 @@ mod tests { #[test] fn test_length() { - let values: Buffer = [1, 1, 2, 2, 2, 3].iter().copied().collect(); - let array = values.into_array(); - let encoded = RLEArray::encode(&array.to_primitive()).unwrap(); + let values: Buffer = buffer![1, 1, 2, 2, 2, 3]; + let encoded = + RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap(); assert_eq!(encoded.len(), 6); } #[test] fn test_empty_length() { let values: Buffer = Buffer::empty(); - let array = values.into_array(); - let encoded = RLEArray::encode(&array.to_primitive()).unwrap(); + let encoded = + RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap(); assert_eq!(encoded.len(), 0); assert_eq!(encoded.values().len(), 0); @@ -193,9 +192,9 @@ mod tests { #[test] fn test_single_value() { let values: Buffer = vec![42; 2000].into_iter().collect(); - let array = values.into_array(); - let encoded = RLEArray::encode(&array.to_primitive()).unwrap(); + let encoded = + RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap(); assert_eq!(encoded.values().len(), 2); // 2 chunks, each storing value 42 let decoded = encoded.to_primitive(); // Verify round-trip @@ -206,9 +205,9 @@ mod tests { #[test] fn test_all_different() { let values: Buffer = (0u8..=255).collect(); - let array = values.into_array(); - let encoded = RLEArray::encode(&array.to_primitive()).unwrap(); + let encoded = + RLEArray::encode(&PrimitiveArray::new(values, Validity::NonNullable)).unwrap(); assert_eq!(encoded.values().len(), 256); let decoded = encoded.to_primitive(); // Verify round-trip @@ -220,9 +219,9 @@ mod tests { fn test_partial_last_chunk() { // Test array with partial last chunk (not divisible by 1024) let values: Buffer = (0..1500).map(|i| (i / 100) as u32).collect(); - let array = values.into_array(); + let array = PrimitiveArray::new(values, Validity::NonNullable); - let encoded = RLEArray::encode(&array.to_primitive()).unwrap(); + let encoded = RLEArray::encode(&array).unwrap(); assert_eq!(encoded.len(), 1500); assert_arrays_eq!(encoded, array); @@ -234,9 +233,9 @@ mod tests { fn test_two_full_chunks() { // Array that spans exactly 2 chunks (2048 elements) let values: Buffer = (0..2048).map(|i| (i / 100) as u32).collect(); - let array = values.into_array(); + let array = PrimitiveArray::new(values, Validity::NonNullable); - let encoded = RLEArray::encode(&array.to_primitive()).unwrap(); + let encoded = RLEArray::encode(&array).unwrap(); assert_eq!(encoded.len(), 2048); assert_arrays_eq!(encoded, array); diff --git a/encodings/fastlanes/src/rle/array/rle_decompress.rs b/encodings/fastlanes/src/rle/array/rle_decompress.rs index 07489d2c5f9..4bd5ac276ec 100644 --- a/encodings/fastlanes/src/rle/array/rle_decompress.rs +++ b/encodings/fastlanes/src/rle/array/rle_decompress.rs @@ -56,7 +56,7 @@ where let indices = array.indices().clone().execute::(ctx)?; let indices = indices.as_slice::(); - assert_eq!(indices.len() % FL_CHUNK_SIZE, 0); + assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE)); let chunk_start_idx = array.offset() / FL_CHUNK_SIZE; let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE); diff --git a/vortex-btrblocks/src/compressor/patches.rs b/vortex-btrblocks/src/compressor/patches.rs index 46cff3a16bf..29612b56a8c 100644 --- a/vortex-btrblocks/src/compressor/patches.rs +++ b/vortex-btrblocks/src/compressor/patches.rs @@ -1,11 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use vortex_array::ArrayRef; use vortex_array::DynArray; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; use vortex_array::patches::Patches; +use vortex_error::VortexError; use vortex_error::VortexResult; /// Compresses the given patches by downscaling integers and checking for constant values. @@ -24,13 +26,17 @@ pub fn compress_patches(patches: &Patches) -> VortexResult { } else { values.clone() }; + let chunk_offsets = patches + .chunk_offsets() + .as_ref() + .map(|offsets| Ok::(offsets.to_primitive().narrow()?.into_array())) + .transpose()?; Patches::new( patches.array_len(), patches.offset(), indices, values, - // TODO(0ax1): chunk offsets - None, + chunk_offsets, ) } diff --git a/vortex-btrblocks/src/compressor/rle.rs b/vortex-btrblocks/src/compressor/rle.rs index ef4b3fcb048..baa3e7b2656 100644 --- a/vortex-btrblocks/src/compressor/rle.rs +++ b/vortex-btrblocks/src/compressor/rle.rs @@ -187,6 +187,11 @@ fn try_compress_delta( let compressed_deltas = compressor.compress_canonical(Canonical::Primitive(deltas), ctx, excludes)?; - vortex_fastlanes::DeltaArray::try_from_delta_compress_parts(compressed_bases, compressed_deltas) - .map(vortex_fastlanes::DeltaArray::into_array) + vortex_fastlanes::DeltaArray::try_new( + compressed_bases, + compressed_deltas, + 0, + primitive_array.len(), + ) + .map(vortex_fastlanes::DeltaArray::into_array) } diff --git a/vortex/benches/single_encoding_throughput.rs b/vortex/benches/single_encoding_throughput.rs index 1c534eb1325..4776afa4a52 100644 --- a/vortex/benches/single_encoding_throughput.rs +++ b/vortex/benches/single_encoding_throughput.rs @@ -152,8 +152,7 @@ fn bench_delta_compress_u32(bencher: Bencher) { .with_inputs(|| &uint_array) .bench_refs(|a| { let (bases, deltas) = delta_compress(a, &mut SESSION.create_execution_ctx()).unwrap(); - DeltaArray::try_from_delta_compress_parts(bases.into_array(), deltas.into_array()) - .unwrap() + DeltaArray::try_new(bases.into_array(), deltas.into_array(), 0, a.len()).unwrap() }); } @@ -162,7 +161,7 @@ fn bench_delta_decompress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); let (bases, deltas) = delta_compress(&uint_array, &mut SESSION.create_execution_ctx()).unwrap(); let compressed = - DeltaArray::try_from_delta_compress_parts(bases.into_array(), deltas.into_array()).unwrap(); + DeltaArray::try_new(bases.into_array(), deltas.into_array(), 0, uint_array.len()).unwrap(); with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| &compressed)