Skip to content

Commit c1c43fc

Browse files
committed
feat: collector automatically merge and align multiple collect() called with different schema
1 parent 4577f0c commit c1c43fc

File tree

3 files changed

+134
-29
lines changed

3 files changed

+134
-29
lines changed

src/builder/analyzer.rs

Lines changed: 92 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,69 @@ fn try_merge_collector_schemas(
255255
schema1: &CollectorSchema,
256256
schema2: &CollectorSchema,
257257
) -> Result<CollectorSchema> {
258-
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
258+
// Union all fields from both schemas
259+
let mut field_map: HashMap<FieldName, EnrichedValueType> = HashMap::new();
260+
261+
// Add fields from schema1
262+
for field in &schema1.fields {
263+
field_map.insert(field.name.clone(), field.value_type.clone());
264+
}
265+
266+
// Merge fields from schema2
267+
for field in &schema2.fields {
268+
if let Some(existing_type) = field_map.get(&field.name) {
269+
// Try to merge types if they differ
270+
let merged_type = try_make_common_value_type(existing_type, &field.value_type)?;
271+
field_map.insert(field.name.clone(), merged_type);
272+
} else {
273+
field_map.insert(field.name.clone(), field.value_type.clone());
274+
}
275+
}
276+
277+
// Sort fields by name for consistent ordering, but prioritize UUID fields
278+
let mut fields: Vec<FieldSchema> = field_map
279+
.into_iter()
280+
.map(|(name, value_type)| FieldSchema {
281+
name,
282+
value_type,
283+
description: None,
284+
})
285+
.collect();
286+
287+
// Prioritize UUID fields by placing them at the beginning for efficiency
288+
fields.sort_by(|a, b| {
289+
let a_is_uuid = matches!(a.value_type.typ, ValueType::Basic(BasicValueType::Uuid));
290+
let b_is_uuid = matches!(b.value_type.typ, ValueType::Basic(BasicValueType::Uuid));
291+
292+
match (a_is_uuid, b_is_uuid) {
293+
(true, false) => std::cmp::Ordering::Less, // UUID fields first
294+
(false, true) => std::cmp::Ordering::Greater, // UUID fields first
295+
_ => a.name.cmp(&b.name), // Then alphabetical
296+
}
297+
});
298+
299+
// Handle auto_uuid_field_idx (UUID fields are now at position 0 for efficiency)
300+
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
301+
(Some(idx1), Some(idx2)) => {
302+
let name1 = &schema1.fields[idx1].name;
303+
let name2 = &schema2.fields[idx2].name;
304+
if name1 == name2 {
305+
// UUID fields are prioritized to position 0, so check if first field is UUID
306+
if fields.first().map_or(false, |f| matches!(f.value_type.typ, ValueType::Basic(BasicValueType::Uuid))) {
307+
Some(0)
308+
} else {
309+
fields.iter().position(|f| &f.name == name1)
310+
}
311+
} else {
312+
None // Different auto_uuid fields, disable
313+
}
314+
}
315+
_ => None, // If either doesn't have it, or both don't, disable
316+
};
317+
259318
Ok(CollectorSchema {
260319
fields,
261-
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
262-
schema1.auto_uuid_field_idx
263-
} else {
264-
None
265-
},
320+
auto_uuid_field_idx,
266321
})
267322
}
268323

@@ -803,16 +858,41 @@ impl AnalyzerContext {
803858
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
804859
let has_auto_uuid_field = op.auto_uuid_field.is_some();
805860
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
861+
let input_field_names: Vec<FieldName> = fields_schema.iter().map(|f| f.name.clone()).collect();
862+
let collector_ref = add_collector(
863+
&op.scope_name,
864+
op.collector_name.clone(),
865+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
866+
op_scope,
867+
)?;
868+
// Get the merged collector schema after adding
869+
let collector_schema: Arc<CollectorSchema> = {
870+
let scope = find_scope(&op.scope_name, op_scope)?.1;
871+
let states = scope.states.lock().unwrap();
872+
let collector = states.collectors.get(&op.collector_name).unwrap();
873+
collector.schema.clone()
874+
};
875+
876+
// Pre-compute field index mappings for efficient evaluation
877+
let field_index_mapping: Vec<usize> = input_field_names
878+
.iter()
879+
.map(|field_name| {
880+
collector_schema
881+
.fields
882+
.iter()
883+
.position(|f| &f.name == field_name)
884+
.unwrap_or(usize::MAX)
885+
})
886+
.collect();
887+
806888
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
807889
name: reactive_op.name.clone(),
808890
has_auto_uuid_field,
809891
input: struct_mapping,
810-
collector_ref: add_collector(
811-
&op.scope_name,
812-
op.collector_name.clone(),
813-
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
814-
op_scope,
815-
)?,
892+
input_field_names,
893+
collector_schema,
894+
collector_ref,
895+
field_index_mapping,
816896
fingerprinter,
817897
});
818898
async move { Ok(collect_op) }.boxed()

src/builder/plan.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::base::schema::FieldSchema;
2+
use crate::base::spec::FieldName;
23
use crate::prelude::*;
34

45
use crate::ops::interface::*;
@@ -90,7 +91,12 @@ pub struct AnalyzedCollectOp {
9091
pub name: String,
9192
pub has_auto_uuid_field: bool,
9293
pub input: AnalyzedStructMapping,
94+
pub input_field_names: Vec<FieldName>,
95+
pub collector_schema: Arc<schema::CollectorSchema>,
9396
pub collector_ref: AnalyzedCollectorReference,
97+
/// Pre-computed mapping from input field index to collector field index.
98+
/// For missing fields, the value is usize::MAX.
99+
pub field_index_mapping: Vec<usize>,
94100
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
95101
pub fingerprinter: Fingerprinter,
96102
}

src/execution/evaluator.rs

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -483,26 +483,45 @@ async fn evaluate_op_scope(
483483
}
484484

485485
AnalyzedReactiveOp::Collect(op) => {
486-
let mut field_values = Vec::with_capacity(
487-
op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 },
488-
);
489-
let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries);
490-
if op.has_auto_uuid_field {
491-
field_values.push(value::Value::Null);
492-
field_values.extend(field_values_iter);
493-
let uuid = memory.next_uuid(
494-
op.fingerprinter
495-
.clone()
496-
.with(&field_values[1..])?
497-
.into_fingerprint(),
498-
)?;
499-
field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid));
500-
} else {
501-
field_values.extend(field_values_iter);
502-
};
503486
let collector_entry = scoped_entries
504487
.headn(op.collector_ref.scope_up_level as usize)
505488
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
489+
490+
// Assemble input values
491+
let input_values: Vec<value::Value> =
492+
assemble_input_values(&op.input.fields, scoped_entries).collect();
493+
494+
// Create field_values vector for all fields in the merged schema
495+
let mut field_values: Vec<value::Value> =
496+
vec![value::Value::Null; op.collector_schema.fields.len()];
497+
498+
// Use pre-computed field index mappings for O(1) field placement
499+
for (i, &collector_field_idx) in op.field_index_mapping.iter().enumerate() {
500+
if collector_field_idx != usize::MAX {
501+
field_values[collector_field_idx] = input_values[i].clone();
502+
}
503+
}
504+
505+
// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
506+
if op.has_auto_uuid_field {
507+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
508+
let uuid = memory.next_uuid(
509+
op.fingerprinter
510+
.clone()
511+
.with(
512+
&field_values
513+
.iter()
514+
.enumerate()
515+
.filter(|(i, _)| *i != uuid_idx)
516+
.map(|(_, v)| v)
517+
.collect::<Vec<_>>(),
518+
)?
519+
.into_fingerprint(),
520+
)?;
521+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
522+
}
523+
}
524+
506525
{
507526
let mut collected_records = collector_entry.collected_values
508527
[op.collector_ref.local.collector_idx as usize]

0 commit comments

Comments
 (0)