diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index d52c7978..966c1baf 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -255,14 +255,88 @@ fn try_merge_collector_schemas( schema1: &CollectorSchema, schema2: &CollectorSchema, ) -> Result { - let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?; + let schema1_fields = &schema1.fields; + let schema2_fields = &schema2.fields; + + // Create a map from field name to index in schema1 + let field_map: HashMap = schema1_fields + .iter() + .enumerate() + .map(|(i, f)| (f.name.clone(), i)) + .collect(); + + let mut output_fields = Vec::new(); + let mut next_field_id_1 = 0; + let mut next_field_id_2 = 0; + + for (idx, field) in schema2_fields.iter().enumerate() { + if let Some(&idx1) = field_map.get(&field.name) { + if idx1 < next_field_id_1 { + api_bail!( + "Common fields are expected to have consistent order across different `collect()` calls, but got different orders between fields '{}' and '{}'", + field.name, + schema1_fields[next_field_id_1 - 1].name + ); + } + // Add intervening fields from schema1 + for i in next_field_id_1..idx1 { + output_fields.push(schema1_fields[i].clone()); + } + // Add intervening fields from schema2 + for i in next_field_id_2..idx { + output_fields.push(schema2_fields[i].clone()); + } + // Merge the field + let merged_type = + try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?; + output_fields.push(FieldSchema { + name: field.name.clone(), + value_type: merged_type, + description: None, + }); + next_field_id_1 = idx1 + 1; + next_field_id_2 = idx + 1; + // Fields not in schema1 and not UUID are added at the end + } + } + + // Add remaining fields from schema1 + for i in next_field_id_1..schema1_fields.len() { + output_fields.push(schema1_fields[i].clone()); + } + + // Add remaining fields from schema2 + for i in next_field_id_2..schema2_fields.len() { + output_fields.push(schema2_fields[i].clone()); + } + + // Handle auto_uuid_field_idx + let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) { + (Some(idx1), Some(idx2)) => { + let name1 = &schema1_fields[idx1].name; + let name2 = &schema2_fields[idx2].name; + if name1 == name2 { + // Find the position of the auto_uuid field in the merged output + output_fields.iter().position(|f| &f.name == name1) + } else { + api_bail!( + "Generated UUID fields must have the same name across different `collect()` calls, got different names: '{}' vs '{}'", + name1, + name2 + ); + } + } + (Some(_), None) | (None, Some(_)) => { + api_bail!( + "The generated UUID field, once present for one `collect()`, must be consistently present for other `collect()` calls for the same collector" + ); + } + (None, None) => None, + }; + Ok(CollectorSchema { - fields, - auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx { - schema1.auto_uuid_field_idx - } else { - None - }, + fields: output_fields, + auto_uuid_field_idx, }) } @@ -704,11 +778,14 @@ impl AnalyzerContext { op_scope: &Arc, reactive_op: &NamedSpec, ) -> Result>> { - let result_fut = match &reactive_op.spec { + let op_scope_clone = op_scope.clone(); + let reactive_op_clone = reactive_op.clone(); + let reactive_op_name = reactive_op.name.clone(); + let result_fut = match reactive_op_clone.spec { ReactiveOpSpec::Transform(op) => { let input_field_schemas = analyze_input_fields(&op.inputs, op_scope).with_context(|| { - format!("Preparing inputs for transform op: {}", reactive_op.name) + format!("Preparing inputs for transform op: {}", reactive_op_name) })?; let spec = serde_json::Value::Object(op.op.spec.clone()); @@ -725,8 +802,8 @@ impl AnalyzerContext { .with(&output_enriched_type.without_attrs())?; let output_type = output_enriched_type.typ.clone(); let output = - op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?; - let op_name = reactive_op.name.clone(); + op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?; + let op_name = reactive_op_name.clone(); async move { trace!("Start building executor for transform op `{op_name}`"); let executor = executor.await.with_context(|| { @@ -777,10 +854,10 @@ impl AnalyzerContext { .lock() .unwrap() .sub_scopes - .insert(reactive_op.name.clone(), Arc::new(sub_op_scope_schema)); + .insert(reactive_op_name.clone(), Arc::new(sub_op_scope_schema)); analyzed_op_scope_fut }; - let op_name = reactive_op.name.clone(); + let op_name = reactive_op_name.clone(); let concur_control_options = foreach_op.execution_options.get_concur_control_options(); @@ -800,22 +877,61 @@ impl AnalyzerContext { } ReactiveOpSpec::Collect(op) => { - let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?; + let (struct_mapping, fields_schema) = + analyze_struct_mapping(&op.input, &op_scope_clone)?; let has_auto_uuid_field = op.auto_uuid_field.is_some(); let fingerprinter = Fingerprinter::default().with(&fields_schema)?; - let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp { - name: reactive_op.name.clone(), - has_auto_uuid_field, - input: struct_mapping, - collector_ref: add_collector( - &op.scope_name, - op.collector_name.clone(), - CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), - op_scope, - )?, - fingerprinter, - }); - async move { Ok(collect_op) }.boxed() + let input_field_names: Vec = + fields_schema.iter().map(|f| f.name.clone()).collect(); + let collector_ref = add_collector( + &op.scope_name, + op.collector_name.clone(), + CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), + &op_scope_clone, + )?; + async move { + // Get the merged collector schema after adding + let collector_schema: Arc = { + let scope = find_scope(&op.scope_name, &op_scope_clone)?.1; + let states = scope.states.lock().unwrap(); + let collector = states.collectors.get(&op.collector_name).unwrap(); + collector.schema.clone() + }; + + // Pre-compute field index mappings for efficient evaluation + let field_name_to_index: HashMap<&FieldName, usize> = collector_schema + .fields + .iter() + .enumerate() + .map(|(i, f)| (&f.name, i)) + .collect(); + let mut field_index_mapping: HashMap = HashMap::new(); + for (input_idx, field_name) in input_field_names.iter().enumerate() { + let collector_idx = field_name_to_index + .get(field_name) + .copied() + .ok_or_else(|| { + anyhow!( + "field `{}` not found in merged collector schema", + field_name + ) + })?; + field_index_mapping.insert(collector_idx, input_idx); + } + + let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp { + name: reactive_op_name, + has_auto_uuid_field, + input: struct_mapping, + input_field_names, + collector_schema, + collector_ref, + field_index_mapping, + fingerprinter, + }); + Ok(collect_op) + } + .boxed() } }; Ok(result_fut) diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 3930d3b7..78e05354 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -1,6 +1,9 @@ use crate::base::schema::FieldSchema; +use crate::base::spec::FieldName; use crate::prelude::*; +use std::collections::HashMap; + use crate::ops::interface::*; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; @@ -90,7 +93,11 @@ pub struct AnalyzedCollectOp { pub name: String, pub has_auto_uuid_field: bool, pub input: AnalyzedStructMapping, + pub input_field_names: Vec, + pub collector_schema: Arc, pub collector_ref: AnalyzedCollectorReference, + /// Pre-computed mapping from collector field index to input field index. + pub field_index_mapping: HashMap, /// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs. pub fingerprinter: Fingerprinter, } diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 0e4a920b..957e38d1 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -515,6 +515,41 @@ async fn evaluate_op_scope( let collector_entry = scoped_entries .headn(op.collector_ref.scope_up_level as usize) .ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?; + + // Assemble input values + let input_values: Vec = + assemble_input_values(&op.input.fields, scoped_entries) + .collect::>>()?; + + // Create field_values vector for all fields in the merged schema + let mut field_values: Vec = + vec![value::Value::Null; op.collector_schema.fields.len()]; + + // Use pre-computed field index mappings for O(1) field placement + for (&collector_idx, &input_idx) in op.field_index_mapping.iter() { + field_values[collector_idx] = input_values[input_idx].clone(); + } + + // Handle auto_uuid_field (assumed to be at position 0 for efficiency) + if op.has_auto_uuid_field { + if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx { + let uuid = memory.next_uuid( + op.fingerprinter + .clone() + .with( + &field_values + .iter() + .enumerate() + .filter(|(i, _)| *i != uuid_idx) + .map(|(_, v)| v) + .collect::>(), + )? + .into_fingerprint(), + )?; + field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid)); + } + } + { let mut collected_records = collector_entry.collected_values [op.collector_ref.local.collector_idx as usize]