diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 34aac8b08aa0..5a33ebdb3742 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -478,7 +478,7 @@ mod test { .unwrap(); assert_eq!( err.to_string(), - "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..357" + "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..392" ); } diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 290a887b2960..bc7e63c4ff07 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -284,10 +284,12 @@ impl Sbbf { /// flush the writer in order to boost performance of bulk writing all blocks. Caller /// must remember to flush the writer. pub(crate) fn write(&self, mut writer: W) -> Result<(), ParquetError> { - let mut protocol = ThriftCompactOutputProtocol::new(&mut writer); - self.header().write_thrift(&mut protocol).map_err(|e| { - ParquetError::General(format!("Could not write bloom filter header: {e}")) - })?; + { + let mut protocol = ThriftCompactOutputProtocol::new(&mut writer); + self.header().write_thrift(&mut protocol).map_err(|e| { + ParquetError::General(format!("Could not write bloom filter header: {e}")) + })?; + } self.write_bitset(&mut writer)?; Ok(()) } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 1c8f3e9c69ba..629eeb62f582 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1628,8 +1628,10 @@ mod tests { .unwrap(); let mut buf = Vec::new(); - let mut writer = ThriftCompactOutputProtocol::new(&mut buf); - row_group_meta.write_thrift(&mut writer).unwrap(); + { + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + row_group_meta.write_thrift(&mut writer).unwrap(); + } let row_group_res = read_row_group(&mut buf, schema_descr).unwrap(); @@ -1710,8 +1712,10 @@ mod tests { .build() .unwrap(); let mut buf = Vec::new(); - let mut writer = ThriftCompactOutputProtocol::new(&mut buf); - row_group_meta_2cols.write_thrift(&mut writer).unwrap(); + { + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + row_group_meta_2cols.write_thrift(&mut writer).unwrap(); + } let err = read_row_group(&mut buf, schema_descr_3cols) .unwrap_err() @@ -1761,8 +1765,10 @@ mod tests { .unwrap(); let mut buf = Vec::new(); - let mut writer = ThriftCompactOutputProtocol::new(&mut buf); - col_metadata.write_thrift(&mut writer).unwrap(); + { + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + } let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap(); assert_eq!(col_chunk_res, col_metadata); @@ -1777,8 +1783,10 @@ mod tests { .unwrap(); let mut buf = Vec::new(); - let mut writer = ThriftCompactOutputProtocol::new(&mut buf); - col_metadata.write_thrift(&mut writer).unwrap(); + { + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + col_metadata.write_thrift(&mut writer).unwrap(); + } let col_chunk_res = read_column_chunk(&mut buf, column_descr).unwrap(); assert_eq!(col_chunk_res, col_metadata); diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 14774910961f..87270dff130e 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -38,7 +38,7 @@ use crate::{ ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Repetition, Type, }, - data_type::{ByteArray, FixedLenByteArray, Int96}, + data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96}, errors::{ParquetError, Result}, file::{ metadata::{ @@ -49,7 +49,7 @@ use crate::{ statistics::ValueStatistics, }, parquet_thrift::{ - ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, + ElementType, FieldType, MetadataIndexBuilder, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField, read_thrift_vec, }, @@ -499,6 +499,7 @@ fn read_column_metadata<'a>( fn read_column_chunk<'a>( prot: &mut ThriftSliceInputProtocol<'a>, column_descr: &Arc, + col_meta_len: Option, ) -> Result { // create a default initialized ColumnMetaData let mut col = ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?; @@ -535,7 +536,14 @@ fn read_column_chunk<'a>( has_file_offset = true; } 3 => { - col_meta_mask = read_column_metadata(&mut *prot, &mut col)?; + if let Some(meta_len) = col_meta_len { + let meta_bytes = &prot.as_slice()[..meta_len]; + let mut meta_prot = ThriftSliceInputProtocol::new(meta_bytes); + col_meta_mask = read_column_metadata(&mut meta_prot, &mut col)?; + prot.skip_bytes(meta_len)?; + } else { + col_meta_mask = read_column_metadata(&mut *prot, &mut col)?; + } } 4 => { col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?); @@ -585,6 +593,7 @@ fn read_column_chunk<'a>( fn read_row_group( prot: &mut ThriftSliceInputProtocol, schema_descr: &Arc, + index: Option, ) -> Result { // create default initialized RowGroupMetaData let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked(); @@ -622,9 +631,25 @@ fn read_row_group( list_ident.size )); } - for i in 0..list_ident.size as usize { - let col = read_column_chunk(prot, &schema_descr.columns()[i])?; - row_group.columns.push(col); + if let Some(meta_idx) = index.as_ref() { + for i in 0..list_ident.size as usize { + let col_len = meta_idx.column_chunk_len(i); + let col_meta_len = meta_idx.column_meta_len(i); + let col_bytes = &prot.as_slice()[..col_len]; + let mut col_prot = ThriftSliceInputProtocol::new(col_bytes); + let col = read_column_chunk( + &mut col_prot, + &schema_descr.columns()[i], + Some(col_meta_len), + )?; + row_group.columns.push(col); + prot.skip_bytes(col_len)?; + } + } else { + for i in 0..list_ident.size as usize { + let col = read_column_chunk(prot, &schema_descr.columns()[i], None)?; + row_group.columns.push(col); + } } mask |= RG_COLUMNS; } @@ -669,10 +694,62 @@ fn read_row_group( Ok(row_group) } +/// Extract the metadata index from the footer bytes. `buf` should contain the entire footer. +pub(crate) fn get_metadata_index(buf: &[u8]) -> Result> { + // TODO(ets): need constants to get rid of magic numbers + if buf.len() < 13 { + return Ok(None); + } + // check the last 4 bytes to see if we have the full footer or not + let magic = &buf[buf.len() - 4..]; + let buf = if magic == "PAR1".as_bytes() { + &buf[0..buf.len() - 8] + } else { + buf + }; + + // check for PARI followed by 0. + if buf[buf.len() - 1] != 0 { + return Ok(None); + } + let magic = &buf[buf.len() - 5..buf.len() - 1]; + if magic != "PARI".as_bytes() { + return Ok(None); + } + + // TODO(ets): i64 is extreme here...should be i32 + let idx_len = + i64::from_le_bytes((&buf[buf.len() - 13..buf.len() - 5]).try_into().unwrap()) as usize; + let buf = &buf[buf.len() - 13 - idx_len..buf.len() - 13]; + + let mut prot = ThriftSliceInputProtocol::new(buf); + let idx = MetaIndex::read_thrift(&mut prot)?; + + Ok(Some(idx)) +} + +struct MetadataIndexSlice<'a> { + col_chunk_offsets: &'a [i64], + col_meta_lengths: &'a [i64], +} + +impl<'a> MetadataIndexSlice<'a> { + // This assumes `is_valid` has been called on the `MetaIndex` this came from. + fn column_chunk_len(&self, col_idx: usize) -> usize { + (self.col_chunk_offsets[col_idx + 1] - self.col_chunk_offsets[col_idx]) as usize + } + + // This assumes `is_valid` has been called on the `MetaIndex` this came from. + fn column_meta_len(&self, col_idx: usize) -> usize { + self.col_meta_lengths[col_idx] as usize + } +} + /// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in /// the Parquet footer. Page indexes will need to be added later. pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result { - let mut prot = ThriftSliceInputProtocol::new(buf); + // check for index + let index = get_metadata_index(buf)?; // begin reading the file metadata let mut version: Option = None; @@ -689,6 +766,23 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result // this will need to be set before parsing row groups let mut schema_descr: Option> = None; + // if the index is available, read the schema first + if let Some(meta_idx) = index.as_ref() { + let start = meta_idx.schema_offset as usize; + let end = start + meta_idx.schema_length as usize; + let schema_bytes = &buf[start..end]; + + let mut prot = ThriftSliceInputProtocol::new(schema_bytes); + // TODO: could match here and ignore index if it's wrong about the schema location + let val = read_thrift_vec::(&mut prot)?; + let val = parquet_schema_from_array(val)?; + schema_descr = Some(Arc::new(SchemaDescriptor::new(val))); + } + + // TODO(ets): if only reading schema return now + + let mut prot = ThriftSliceInputProtocol::new(buf); + // struct FileMetaData { // 1: required i32 version // 2: required list schema; @@ -711,10 +805,20 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result version = Some(i32::read_thrift(&mut prot)?); } 2 => { - // read schema and convert to SchemaDescriptor for use when reading row groups - let val = read_thrift_vec::(&mut prot)?; - let val = parquet_schema_from_array(val)?; - schema_descr = Some(Arc::new(SchemaDescriptor::new(val))); + // if we already have a schema, then don't decode it again + if schema_descr.is_some() { + if let Some(meta_idx) = index.as_ref() { + prot.skip_bytes(meta_idx.schema_length as usize)?; + } else { + prot.skip(field_ident.field_type)?; + } + } else { + // read schema and convert to SchemaDescriptor for use when reading row groups + let val = + read_thrift_vec::(&mut prot)?; + let val = parquet_schema_from_array(val)?; + schema_descr = Some(Arc::new(SchemaDescriptor::new(val))); + } } 3 => { num_rows = Some(i64::read_thrift(&mut prot)?); @@ -725,9 +829,44 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result } let schema_descr = schema_descr.as_ref().unwrap(); let list_ident = prot.read_list_begin()?; - let mut rg_vec = Vec::with_capacity(list_ident.size as usize); - for _ in 0..list_ident.size { - rg_vec.push(read_row_group(&mut prot, schema_descr)?); + let num_rg = list_ident.size as usize; + let mut rg_vec = Vec::with_capacity(num_rg); + + // if there's an index and it's properly sized then use it to read row groups + // the following is unstable before 1.88.0 2024 edition + // see https://github.com/rust-lang/rust/issues/53667 + /*if let Some(meta_idx) = index.as_ref() + && meta_idx.is_valid(num_rg, schema_descr.num_columns()) + { + for i in 0..num_rg { + let slice = meta_idx.get_slice(i, schema_descr); + let rg_len = meta_idx.row_group_len(i); + let rg_bytes = &prot.as_slice()[..rg_len]; + let mut rg_prot = ThriftSliceInputProtocol::new(rg_bytes); + rg_vec.push(read_row_group(&mut rg_prot, schema_descr, Some(slice))?); + prot.skip_bytes(rg_len)?; + } + } else { + for _ in 0..list_ident.size { + rg_vec.push(read_row_group(&mut prot, schema_descr, None)?); + } + }*/ + match index.as_ref() { + Some(meta_idx) if meta_idx.is_valid(num_rg, schema_descr.num_columns()) => { + for i in 0..num_rg { + let slice = meta_idx.get_slice(i, schema_descr); + let rg_len = meta_idx.row_group_len(i); + let rg_bytes = &prot.as_slice()[..rg_len]; + let mut rg_prot = ThriftSliceInputProtocol::new(rg_bytes); + rg_vec.push(read_row_group(&mut rg_prot, schema_descr, Some(slice))?); + prot.skip_bytes(rg_len)?; + } + } + _ => { + for _ in 0..list_ident.size { + rg_vec.push(read_row_group(&mut prot, schema_descr, None)?); + } + } } row_groups = Some(rg_vec); } @@ -1250,6 +1389,77 @@ pub(super) struct FileMeta<'a> { pub(super) row_groups: &'a Vec, } +// TODO(ets): things to consider +// +// To keep this as small as possible, we could encode a starting offset and then lengths +// for row groups and column chunks. Then we only need N lengths rather than N + 1 offsets, +// and the lengths will be approximately the same size, and could be much smaller than the +// final offset. Then we'd just do an exclusive scan of the sizes to get the offsets array. +// +// In tandem with above, we could change the column offsets to be relative to the start of the +// row group that contains them. +// +// We could skip thrift encoding, and just give byte length for each array, then we'd have +// random access to the index as well. +thrift_struct!( +pub(crate) struct MetaIndex { + /// starting offset relative to start of metadata for the schema + 1: required i64 schema_offset + /// number of bytes for the serialized schema + 2: required i64 schema_length + /// vector of starting offsets for the row groups. length is num_row_group + 1 to + /// accomodate end of last row group + 3: required list row_group_offsets + /// vector of offsets to column metadata. size is n_row_group * (num_columns + 1). + /// flattened matrix of offsets for each column in a row group, with the offset of the + /// end of the last column as the last element of each row. + 4: required list column_offsets + /// vector of lengths for the `ColumnMetaData` portion of the column metadata. this + /// would allow for skipping elements like statistics quickly. + /// size is n_row_group * num_columns. + 5: required list column_meta_lengths +} +); + +impl MetaIndex { + /// Perform a check that the lists are appropriately sized. This must be called before using + /// any of the other functions in this impl. + fn is_valid(&self, num_rg: usize, num_col: usize) -> bool { + self.row_group_offsets.len() == num_rg + 1 + && self.column_meta_lengths.len() == num_rg * num_col + && self.column_offsets.len() == num_rg * (num_col + 1) + } + + /// Return the length of the row group at the given index. + /// + /// This may panic if [`Self::is_valid`] is not `true` or `rg_idx` is out of bounds. + fn row_group_len(&self, rg_idx: usize) -> usize { + (self.row_group_offsets[rg_idx + 1] - self.row_group_offsets[rg_idx]) as usize + } + + /// Return a slice of the index for the row group at the given index. + /// + /// This may panic if [`Self::is_valid`] is not `true` or `rg_idx` is out of bounds. + fn get_slice( + &'_ self, + rg_idx: usize, + schema_descr: &Arc, + ) -> MetadataIndexSlice<'_> { + let num_cols = schema_descr.num_columns(); + let start = rg_idx * (num_cols + 1); + let end = start + num_cols + 1; + let col_chunk_offsets = &self.column_offsets[start..end]; + let start = rg_idx * num_cols; + let end = start + num_cols; + let col_meta_lengths = &self.column_meta_lengths[start..end]; + + MetadataIndexSlice { + col_chunk_offsets, + col_meta_lengths, + } + } +} + // struct FileMetaData { // 1: required i32 version // 2: required list schema; @@ -1267,6 +1477,15 @@ impl<'a> WriteThrift for FileMeta<'a> { // needed for last_field_id w/o encryption #[allow(unused_assignments)] fn write_thrift(&self, writer: &mut ThriftCompactOutputProtocol) -> Result<()> { + // add indexing to protocol writer + let metadata_index = MetadataIndexBuilder::new( + writer.bytes_written(), + self.row_groups.len(), + self.file_metadata.schema_descr().num_columns(), + ); + writer.set_index(metadata_index); + + // field 1 is version self.file_metadata .version .write_thrift_field(writer, 1, 0)?; @@ -1276,16 +1495,38 @@ impl<'a> WriteThrift for FileMeta<'a> { let root = self.file_metadata.schema_descr().root_schema_ptr(); let schema_len = num_nodes(&root)?; writer.write_field_begin(FieldType::List, 2, 1)?; + + // schema range needs to include the list header + let schema_start = writer.bytes_written(); writer.write_list_begin(ElementType::Struct, schema_len)?; // recursively write Type nodes as SchemaElements write_schema(&root, writer)?; + let schema_end = writer.bytes_written(); + writer.set_schema_range(schema_start, schema_end); + // field 3 is num_rows self.file_metadata .num_rows .write_thrift_field(writer, 3, 2)?; - // this will call RowGroupMetaData::write_thrift - let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 3)?; + // field 4 is row_groups + // write row groups manually so we can track their start positions wrt the start of + // the metadata. + + // field header + writer.write_field_begin(FieldType::List, 4, 3)?; + // list header + writer.write_list_begin(ElementType::Struct, self.row_groups.len())?; + // write row groups and save positions + for rg in self.row_groups { + writer.mark_row_group(); + rg.write_thrift(writer)?; + } + + // record end of the last row group + writer.mark_row_group(); + + let mut last_field_id = 4; if let Some(kv_metadata) = self.file_metadata.key_value_metadata() { last_field_id = kv_metadata.write_thrift_field(writer, 5, last_field_id)?; @@ -1306,6 +1547,32 @@ impl<'a> WriteThrift for FileMeta<'a> { .write_thrift_field(writer, 9, last_field_id)?; } + let mut index = Vec::::new(); + { + let mut w = ThriftCompactOutputProtocol::new(&mut index); + if let Some(meta_index) = writer.take_index() { + let idx = MetaIndex { + schema_offset: meta_index.schema_range.start, + schema_length: meta_index.schema_range.end - meta_index.schema_range.start, + row_group_offsets: meta_index.row_group_offsets, + column_offsets: meta_index.col_chunk_offsets, + column_meta_lengths: meta_index.col_meta_lengths, + }; + idx.write_thrift(&mut w)?; + } + } + + if !index.is_empty() { + // write footer for index + // TODO(ets): this should use UUID rather than simple string, but this works for prototype + let idx_len = index.len() as u64; + index.extend_from_slice(idx_len.as_bytes()); + index.extend_from_slice("PARI".as_bytes()); + + // write the index as binary extension type + index.as_slice().write_thrift_field(writer, 32767, 0)?; + } + writer.write_struct_end() } } @@ -1412,8 +1679,18 @@ impl WriteThrift for RowGroupMetaData { const ELEMENT_TYPE: ElementType = ElementType::Struct; fn write_thrift(&self, writer: &mut ThriftCompactOutputProtocol) -> Result<()> { - // this will call ColumnChunkMetaData::write_thrift - self.columns.write_thrift_field(writer, 1, 0)?; + // field 1 column chunks + writer.write_field_begin(FieldType::List, 1, 0)?; + writer.write_list_begin(ElementType::Struct, self.num_columns())?; + for col in self.columns() { + // save offset for this column chunk + writer.mark_column_chunk(); + col.write_thrift(writer)?; + } + // save offset of end of last column chunk + writer.mark_column_chunk(); + + // write remain fields self.total_byte_size.write_thrift_field(writer, 2, 1)?; let mut last_field_id = self.num_rows.write_thrift_field(writer, 3, 2)?; if let Some(sorting_columns) = self.sorting_columns() { @@ -1462,7 +1739,10 @@ impl WriteThrift for ColumnChunkMetaData { // only write the ColumnMetaData if we haven't already encrypted it if self.encrypted_column_metadata.is_none() { writer.write_field_begin(FieldType::Struct, 3, last_field_id)?; + // save start of the `ColumnMetaData` + let col_meta_start = writer.bytes_written(); serialize_column_meta_data(self, writer)?; + writer.push_col_meta_length(writer.bytes_written() - col_meta_start); last_field_id = 3; } } @@ -1470,7 +1750,10 @@ impl WriteThrift for ColumnChunkMetaData { { // always write the ColumnMetaData writer.write_field_begin(FieldType::Struct, 3, last_field_id)?; + // save start of the `ColumnMetaData` + let col_meta_start = writer.bytes_written(); serialize_column_meta_data(self, writer)?; + writer.push_col_meta_length(writer.bytes_written() - col_meta_start); last_field_id = 3; } @@ -1586,7 +1869,7 @@ pub(crate) mod tests { schema_descr: Arc, ) -> Result { let mut reader = ThriftSliceInputProtocol::new(buf); - crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr) + crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, None) } pub(crate) fn read_column_chunk( @@ -1594,19 +1877,21 @@ pub(crate) mod tests { column_descr: Arc, ) -> Result { let mut reader = ThriftSliceInputProtocol::new(buf); - crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr) + crate::file::metadata::thrift::read_column_chunk(&mut reader, &column_descr, None) } pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result { let num_nodes = num_nodes(&schema)?; let mut buf = Vec::new(); - let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + { + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); - // kick off writing list - writer.write_list_begin(ElementType::Struct, num_nodes)?; + // kick off writing list + writer.write_list_begin(ElementType::Struct, num_nodes)?; - // write SchemaElements - write_schema(&schema, &mut writer)?; + // write SchemaElements + write_schema(&schema, &mut writer)?; + } let mut prot = ThriftSliceInputProtocol::new(&buf); let se: Vec = read_thrift_vec(&mut prot)?; @@ -1616,13 +1901,15 @@ pub(crate) mod tests { pub(crate) fn schema_to_buf(schema: &TypePtr) -> Result> { let num_nodes = num_nodes(schema)?; let mut buf = Vec::new(); - let mut writer = ThriftCompactOutputProtocol::new(&mut buf); + { + let mut writer = ThriftCompactOutputProtocol::new(&mut buf); - // kick off writing list - writer.write_list_begin(ElementType::Struct, num_nodes)?; + // kick off writing list + writer.write_list_begin(ElementType::Struct, num_nodes)?; - // write SchemaElements - write_schema(schema, &mut writer)?; + // write SchemaElements + write_schema(schema, &mut writer)?; + } Ok(buf) } diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 124bc11bddc5..55751aabb900 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -554,8 +554,10 @@ impl MetadataObjectWriter { Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => { // First write FileCryptoMetadata let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?; - let mut protocol = ThriftCompactOutputProtocol::new(&mut sink); - crypto_metadata.write_thrift(&mut protocol)?; + { + let mut protocol = ThriftCompactOutputProtocol::new(&mut sink); + crypto_metadata.write_thrift(&mut protocol)?; + } // Then write encrypted footer let aad = create_footer_aad(file_encryptor.file_aad())?; diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs index f9fa66ee0d3f..7c61cf0a827c 100644 --- a/parquet/src/parquet_thrift.rs +++ b/parquet/src/parquet_thrift.rs @@ -29,10 +29,12 @@ use std::{ cmp::Ordering, io::{Read, Write}, + ops::Range, }; use crate::{ errors::{ParquetError, Result}, + file::writer::TrackedWrite, write_thrift_field, }; use std::io::Error; @@ -521,6 +523,12 @@ impl<'a> ThriftSliceInputProtocol<'a> { pub fn as_slice(&self) -> &'a [u8] { self.buf } + + /// Reset the underlying slice. + #[allow(dead_code)] + pub fn reset(&mut self, buf: &'a [u8]) { + self.buf = buf; + } } impl<'b, 'a: 'b> ThriftCompactInputProtocol<'b> for ThriftSliceInputProtocol<'a> { @@ -687,6 +695,47 @@ where Ok(res) } +/// Collects metadata index information during writes of the Parquet footer +pub(crate) struct MetadataIndexBuilder { + start_offset: usize, + pub(crate) schema_range: Range, + pub(crate) row_group_offsets: Vec, + pub(crate) col_chunk_offsets: Vec, + pub(crate) col_meta_lengths: Vec, +} + +impl MetadataIndexBuilder { + pub(crate) fn new(start_offset: usize, num_row_groups: usize, num_columns: usize) -> Self { + Self { + start_offset, + schema_range: Default::default(), + row_group_offsets: Vec::with_capacity(num_row_groups + 1), + col_chunk_offsets: Vec::with_capacity(num_row_groups * (num_columns + 1)), + col_meta_lengths: Vec::with_capacity(num_row_groups * num_columns), + } + } + + pub(crate) fn set_schema_range(&mut self, start: usize, end: usize) { + let start = (start - self.start_offset) as i64; + let end = (end - self.start_offset) as i64; + self.schema_range = start..end; + } + + pub(crate) fn push_row_group(&mut self, offset: usize) { + self.row_group_offsets + .push((offset - self.start_offset) as i64); + } + + pub(crate) fn push_column(&mut self, offset: usize) { + self.col_chunk_offsets + .push((offset - self.start_offset) as i64); + } + + pub(crate) fn push_col_meta_length(&mut self, length: usize) { + self.col_meta_lengths.push(length as i64); + } +} + ///////////////////////// // thrift compact output @@ -699,13 +748,69 @@ where /// /// [compact output]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md pub(crate) struct ThriftCompactOutputProtocol { - writer: W, + writer: TrackedWrite, + index: Option, } impl ThriftCompactOutputProtocol { /// Create a new `ThriftCompactOutputProtocol` wrapping the byte sink `writer`. + /// + /// This will first wrap `writer` in a [`TrackedWrite`] pub(crate) fn new(writer: W) -> Self { - Self { writer } + Self::new_tracked(TrackedWrite::new(writer)) + } + + /// Create a new [`ThriftCompactOutputProtocol`], wrapping the input [`TrackedWrite`]. + pub(crate) fn new_tracked(writer: TrackedWrite) -> Self { + Self { + writer, + index: None, + } + } + + /// Add a `MetadataIndexBuilder` to this writer. + pub(crate) fn set_index(&mut self, index: MetadataIndexBuilder) { + self.index = Some(index); + } + + /// Take the index from this writer, leaving `None` in its place. + pub(crate) fn take_index(&mut self) -> Option { + self.index.take() + } + + // pass-thru methods for the index + + /// Set the start and end positions in the index for the schema. + pub(crate) fn set_schema_range(&mut self, start: usize, end: usize) { + if let Some(idx) = self.index.as_mut() { + idx.set_schema_range(start, end); + } + } + + /// Call to mark the position where a row group begins (or ends for last row group). + pub(crate) fn mark_row_group(&mut self) { + if let Some(idx) = self.index.as_mut() { + idx.push_row_group(self.writer.bytes_written()); + } + } + + /// Call to mark the position where a column chunk begins (or ends for last column in row group). + pub(crate) fn mark_column_chunk(&mut self) { + if let Some(idx) = self.index.as_mut() { + idx.push_column(self.writer.bytes_written()); + } + } + + /// Add a length to the list of `ColumnMetaData` encoded lengths. + pub(crate) fn push_col_meta_length(&mut self, length: usize) { + if let Some(idx) = self.index.as_mut() { + idx.push_col_meta_length(length); + } + } + + /// Return the number of bytes written to the inner [`Write`] + pub(crate) fn bytes_written(&self) -> usize { + self.writer.bytes_written() } /// Write a single byte to the output stream. diff --git a/parquet/tests/arrow_reader/io/sync_reader.rs b/parquet/tests/arrow_reader/io/sync_reader.rs index 77c200fa8641..973734d83c66 100644 --- a/parquet/tests/arrow_reader/io/sync_reader.rs +++ b/parquet/tests/arrow_reader/io/sync_reader.rs @@ -41,7 +41,7 @@ fn test_read_entire_file() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Event: Reader Built", @@ -77,7 +77,7 @@ fn test_read_single_group() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Event: Reader Built", @@ -105,7 +105,7 @@ fn test_read_single_column() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Event: Reader Built", @@ -134,7 +134,7 @@ fn test_read_single_column_no_page_index() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "Event: Builder Configured", "Event: Reader Built", "Row Group 0, column 'b': DictionaryPage (17 bytes , 17 requests) [header]", @@ -176,7 +176,7 @@ fn test_read_row_selection() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Event: Reader Built", @@ -207,7 +207,7 @@ fn test_read_limit() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Event: Reader Built", @@ -244,7 +244,7 @@ fn test_read_single_row_filter() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", @@ -295,7 +295,7 @@ fn test_read_multiple_row_filter() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", @@ -341,7 +341,7 @@ fn test_read_single_row_filter_all() { @r#" [ "Footer: 8 bytes", - "Metadata: 1162", + "Metadata: 1222", "UNKNOWN: 22230..22877 (maybe Page Index)", "Event: Builder Configured", "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]",