Skip to content

Commit cbaab6f

Browse files
committed
fetch
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 6ad2e8b commit cbaab6f

1 file changed

Lines changed: 146 additions & 0 deletions

File tree

vortex-scan/src/fetch_plan.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub(crate) enum MaterializationPlan {
3838
pub(crate) struct DeferredMaterializationPlan {
3939
final_fields: FieldNames,
4040
immediate_fields: FieldNames,
41+
immediate_field_masks: Vec<FieldMask>,
42+
immediate_fetch_row_bytes: usize,
4143
deferred_groups: Vec<DeferredFieldGroup>,
4244
}
4345

@@ -95,8 +97,10 @@ impl MaterializationPlan {
9597
}
9698

9799
let mut immediate = Vec::new();
100+
let mut immediate_field_masks = Vec::new();
98101
let mut deferred_groups = Vec::new();
99102
let mut immediate_carry_cost = 0usize;
103+
let mut immediate_fetch_cost = 0usize;
100104
let mut deferred_carry_cost = 0usize;
101105

102106
for name in final_fields.iter() {
@@ -128,7 +132,12 @@ impl MaterializationPlan {
128132
} else {
129133
immediate_carry_cost =
130134
immediate_carry_cost.saturating_add(carry_cost_bytes_per_row);
135+
immediate_fetch_cost =
136+
immediate_fetch_cost.saturating_add(carry_cost_bytes_per_row);
131137
immediate.push(name.clone());
138+
immediate_field_masks.push(FieldMask::Prefix(FieldPath::from(Field::Name(
139+
name.clone(),
140+
))));
132141
}
133142
}
134143

@@ -150,6 +159,8 @@ impl MaterializationPlan {
150159
Self::Deferred(DeferredMaterializationPlan {
151160
final_fields,
152161
immediate_fields: FieldNames::from(immediate),
162+
immediate_field_masks,
163+
immediate_fetch_row_bytes: immediate_fetch_cost,
153164
deferred_groups,
154165
})
155166
}
@@ -171,6 +182,13 @@ impl MaterializationPlan {
171182
),
172183
Self::Deferred(plan) => {
173184
let mut hints = Vec::new();
185+
if plan.immediate_fetch_row_bytes > 0 {
186+
hints.extend(reader.projection_fetch_hints(
187+
plan.immediate_field_masks.clone(),
188+
row_range.clone(),
189+
plan.immediate_fetch_row_bytes,
190+
)?);
191+
}
174192
for group in &plan.deferred_groups {
175193
hints.extend(reader.projection_fetch_hints(
176194
group.field_masks.clone(),
@@ -322,9 +340,13 @@ fn estimate_dtype_row_bytes(dtype: &DType) -> usize {
322340
#[cfg(test)]
323341
mod tests {
324342
use std::collections::BTreeSet;
343+
use std::ops::Range;
344+
use std::sync::Arc;
325345

346+
use vortex_array::MaskFuture;
326347
use vortex_array::dtype::DType;
327348
use vortex_array::dtype::Field;
349+
use vortex_array::dtype::FieldName;
328350
use vortex_array::dtype::FieldMask;
329351
use vortex_array::dtype::FieldNames;
330352
use vortex_array::dtype::FieldPath;
@@ -333,6 +355,10 @@ mod tests {
333355
use vortex_array::dtype::StructFields;
334356
use vortex_array::expr::root;
335357
use vortex_array::expr::select;
358+
use vortex_error::VortexResult;
359+
use vortex_layout::ArrayFuture;
360+
use vortex_layout::LayoutReader;
361+
use vortex_mask::Mask;
336362

337363
use super::MaterializationPlan;
338364
use super::estimate_field_mask_row_bytes;
@@ -359,6 +385,75 @@ mod tests {
359385
)
360386
}
361387

388+
fn field_mask(name: &str) -> FieldMask {
389+
FieldMask::Prefix(FieldPath::from(Field::Name(name.into())))
390+
}
391+
392+
struct HintOnlyReader {
393+
name: Arc<str>,
394+
dtype: DType,
395+
}
396+
397+
impl HintOnlyReader {
398+
fn new(dtype: DType) -> Self {
399+
Self {
400+
name: Arc::from("hint-only-reader"),
401+
dtype,
402+
}
403+
}
404+
}
405+
406+
impl LayoutReader for HintOnlyReader {
407+
fn name(&self) -> &Arc<str> {
408+
&self.name
409+
}
410+
411+
fn dtype(&self) -> &DType {
412+
&self.dtype
413+
}
414+
415+
fn row_count(&self) -> u64 {
416+
1024
417+
}
418+
419+
fn register_splits(
420+
&self,
421+
_field_mask: &[FieldMask],
422+
row_range: &Range<u64>,
423+
splits: &mut BTreeSet<u64>,
424+
) -> VortexResult<()> {
425+
splits.insert(row_range.end);
426+
Ok(())
427+
}
428+
429+
fn pruning_evaluation(
430+
&self,
431+
_row_range: &Range<u64>,
432+
_expr: &vortex_array::expr::Expression,
433+
_mask: Mask,
434+
) -> VortexResult<MaskFuture> {
435+
unimplemented!()
436+
}
437+
438+
fn filter_evaluation(
439+
&self,
440+
_row_range: &Range<u64>,
441+
_expr: &vortex_array::expr::Expression,
442+
_mask: MaskFuture,
443+
) -> VortexResult<MaskFuture> {
444+
unimplemented!()
445+
}
446+
447+
fn projection_evaluation(
448+
&self,
449+
_row_range: &Range<u64>,
450+
_expr: &vortex_array::expr::Expression,
451+
_mask: MaskFuture,
452+
) -> VortexResult<ArrayFuture> {
453+
unimplemented!()
454+
}
455+
}
456+
362457
#[test]
363458
fn deferred_plan_activates_for_narrow_filtered_projection() {
364459
let projection = select(["id", "payload"], root());
@@ -432,4 +527,55 @@ mod tests {
432527
> estimate_field_mask_row_bytes(&dtype, &id_mask)
433528
);
434529
}
530+
531+
#[test]
532+
fn deferred_fetch_hints_include_non_filter_immediate_fields() {
533+
let dtype = scan_dtype();
534+
let reader = HintOnlyReader::new(dtype.clone());
535+
let projection = select(["id", "payload"], root());
536+
let projection_masks = vec![field_mask("id"), field_mask("payload")];
537+
let plan = MaterializationPlan::from_projection(
538+
&projection,
539+
&dtype,
540+
true,
541+
&projection_masks,
542+
&BTreeSet::from([FieldName::from("score")]),
543+
);
544+
545+
let hints = plan
546+
.fetch_hints(&reader, &projection_masks, &(0..10))
547+
.expect("fetch hints");
548+
549+
let total_bytes = hints.iter().map(|hint| hint.estimated_fetch_bytes).sum::<usize>();
550+
let expected_immediate = estimate_field_mask_row_bytes(&dtype, &[field_mask("id")]);
551+
let expected_deferred = estimate_field_mask_row_bytes(&dtype, &[field_mask("payload")]);
552+
553+
assert_eq!(hints.len(), 2);
554+
assert_eq!(total_bytes, 10 * (expected_immediate + expected_deferred));
555+
}
556+
557+
#[test]
558+
fn deferred_fetch_hints_do_not_double_count_filter_shared_immediate_fields() {
559+
let dtype = scan_dtype();
560+
let reader = HintOnlyReader::new(dtype.clone());
561+
let projection = select(["id", "payload"], root());
562+
let projection_masks = vec![field_mask("id"), field_mask("payload")];
563+
let plan = MaterializationPlan::from_projection(
564+
&projection,
565+
&dtype,
566+
true,
567+
&projection_masks,
568+
&BTreeSet::from([FieldName::from("id")]),
569+
);
570+
571+
let hints = plan
572+
.fetch_hints(&reader, &projection_masks, &(0..10))
573+
.expect("fetch hints");
574+
575+
let total_bytes = hints.iter().map(|hint| hint.estimated_fetch_bytes).sum::<usize>();
576+
let expected_deferred = estimate_field_mask_row_bytes(&dtype, &[field_mask("payload")]);
577+
578+
assert_eq!(hints.len(), 1);
579+
assert_eq!(total_bytes, 10 * expected_deferred);
580+
}
435581
}

0 commit comments

Comments
 (0)