diff --git a/Cargo.toml b/Cargo.toml index 76e16fe..1987e13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,8 @@ license = "MIT" [dependencies] jemallocator = "0.1.8" timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", features = ["bincode"] } -differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } +# differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } +differential-dataflow = {path = "/Users/david/Projects/David/differential-dataflow/"} abomonation = "0.7" abomonation_derive = "0.3" serde = "1" diff --git a/src/plan/aggregate_monoid.rs b/src/plan/aggregate_monoid.rs new file mode 100644 index 0000000..e114199 --- /dev/null +++ b/src/plan/aggregate_monoid.rs @@ -0,0 +1,182 @@ +//! Aggregate expression plan using Diffvector and Monoids + +use differential_dataflow::difference::Monoid; +use std::ops::AddAssign; + +use timely::dataflow::scopes::child::Iterative; +use timely::dataflow::Scope; +use timely::order::TotalOrder; +use timely::progress::Timestamp; + +use differential_dataflow::difference::{DiffPair, DiffVector}; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::Join as JoinMap; +use differential_dataflow::operators::{Consolidate, Count, Reduce, Threshold}; + +use crate::binding::Binding; +use crate::plan::{Dependencies, ImplContext, Implementable}; +use crate::{CollectionRelation, Relation, ShutdownHandle, Value, Var, VariableMap}; + +use num_rational::{Ratio, Rational32}; + +/// Permitted aggregation function. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum AggregationFn { + /// Count + COUNT, + /// Sum + SUM, +} + +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +struct Max { + /// Associated value + pub value: u32, +} + +impl<'a> AddAssign<&'a Self> for Max { + fn add_assign(&mut self, rhs: &Max) { + *self = Max { + value: std::cmp::max(self.value, rhs.value), + } + } +} + +impl Monoid for Max { + fn zero() -> Max { + Max { value: 0 } + } +} + +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +enum Diff { + Maximum(Max), + Sum(i64), +} + +// impl<'a> AddAssign<&'a Self> for Diff { +// fn add_assign(&mut self, rhs: &Self) { +// match self { +// Diff::Maximum(max) => max.add_assign(rhs), +// Diff::Sum(sum) => sum.add_assign(rhs), +// } +// } +// } + +// impl Monoid for Diff { +// fn zero() { +// match self { +// Diff::Maximum(max) => max.zero(), +// Diff::Sum(sum) => 0, +// } +// } +// } + +/// [WIP] A plan stage applying the specified aggregation functions to +/// bindings for the specified variables. Given multiple aggregations +/// we iterate and n-1 joins are applied to the results. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct Aggregate { + /// TODO + pub variables: Vec, + /// Plan for the data source. + pub plan: Box

, + /// Logical predicate to apply. + pub aggregation_fns: Vec, + /// Relation variables that determine the grouping. + pub key_variables: Vec, + /// Aggregation variables + pub aggregation_variables: Vec, + /// With variables + pub with_variables: Vec, +} + +impl Implementable for Aggregate

{ + fn dependencies(&self) -> Dependencies { + self.plan.dependencies() + } + + fn into_bindings(&self) -> Vec { + self.plan.into_bindings() + } + + fn implement<'b, T, I, S>( + &self, + nested: &mut Iterative<'b, S, u64>, + local_arrangements: &VariableMap>, + context: &mut I, + ) -> (CollectionRelation<'b, S>, ShutdownHandle) + where + T: Timestamp + Lattice + TotalOrder, + I: ImplContext, + S: Scope, + { + let (relation, shutdown_handle) = self.plan.implement(nested, local_arrangements, context); + + // We split the incoming tuples into their (key, value) parts. + let tuples = relation.tuples_by_variables(&self.key_variables); + + // For each aggregation function that is to be applied, we + // need to determine the index (into the value part of each + // tuple) at which its argument is to be found. + + let mut value_offsets = Vec::new(); + let mut seen = Vec::new(); + + for variable in self.aggregation_variables.iter() { + if !seen.contains(&variable) { + seen.push(&variable); + value_offsets.push(seen.len() - 1); + } else { + value_offsets.push(seen.iter().position(|&v| variable == v).unwrap()); + } + } + + // Users can specify weird find clauses like [:find ?key1 (min ?v1) ?key2] + // and we would like to avoid an extra projection. Thus, we pre-compute + // the correct output offset for each aggregation. + + let mut variables = self.variables.clone(); + let mut output_offsets = Vec::new(); + + for variable in self.aggregation_variables.iter() { + let output_index = variables.iter().position(|&v| *variable == v).unwrap(); + output_offsets.push(output_index); + + variables[output_index] = 0; + } + let agg_fns = self.aggregation_fns.clone(); + ( + CollectionRelation { + variables: self.variables.to_vec(), + tuples: tuples + .explode(move |(key, values)| { + let mut v = Vec::with_capacity(agg_fns.len()); + for (agg, index) in agg_fns.iter().zip(value_offsets.clone()) { + v.push(match agg { + AggregationFn::COUNT => match values[index] { + Value::Number(_) => 1 as isize, + _ => panic!("Wow... this I can't do!"), + }, + AggregationFn::SUM => match values[index] { + Value::Number(val) => val as isize, + _ => panic!("Wow... this I can't do!"), + }, + }) + } + Some((key, DiffVector::new(v)) + }) + .count() + .map(|(key, vals)| { + let mut v = key.clone(); + let mut diffs: Vec = + vals.into_iter().map(|x| Value::Number(x as i64)).collect(); + v.append(&mut diffs); + v + }) + , + }, + shutdown_handle, + ) + } +} diff --git a/src/plan/mod.rs b/src/plan/mod.rs index e5b2e40..013a349 100644 --- a/src/plan/mod.rs +++ b/src/plan/mod.rs @@ -22,7 +22,7 @@ use crate::{ #[cfg(feature = "set-semantics")] pub mod aggregate; #[cfg(not(feature = "set-semantics"))] -pub mod aggregate_neu; +pub mod aggregate_monoid; pub mod antijoin; pub mod filter; pub mod hector; @@ -35,7 +35,7 @@ pub mod union; #[cfg(feature = "set-semantics")] pub use self::aggregate::{Aggregate, AggregationFn}; #[cfg(not(feature = "set-semantics"))] -pub use self::aggregate_neu::{Aggregate, AggregationFn}; +pub use self::aggregate_monoid::{Aggregate, AggregationFn}; pub use self::antijoin::Antijoin; pub use self::filter::{Filter, Predicate}; pub use self::hector::Hector; diff --git a/tests/aggregation_test.rs b/tests/aggregation_test.rs index acf54fd..e50bc47 100644 --- a/tests/aggregation_test.rs +++ b/tests/aggregation_test.rs @@ -110,385 +110,12 @@ fn run_cases(mut cases: Vec) { } } -#[test] -fn count() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (count ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::COUNT], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Number(6)], 0, 1)]], - // set-semantics - // expectations: vec![ - // vec![(vec![Number(5)], 0, 1)], - // ], - }, - Case { - description: "[:find ?e (count ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::COUNT], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Number(4)], 0, 1), - (vec![Eid(2), Number(2)], 0, 1), - ]], - // set-semantics - // expectations: vec![ - // vec![ - // (vec![Eid(1), Number(4)], 0, 1), - // (vec![Eid(2), Number(1)], 0, 1), - // ], - // ], - }, - ]); -} - -#[test] -fn max() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (max ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::MAX], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Number(10)], 0, 1)]], - }, - Case { - description: "[:find ?e (max ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::MAX], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Number(6)], 0, 1), - (vec![Eid(2), Number(10)], 0, 1), - ]], - }, - ]); -} - -#[test] -fn min() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (min ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::MIN], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Number(2)], 0, 1)]], - }, - Case { - description: "[:find ?e (min ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::MIN], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Number(2)], 0, 1), - (vec![Eid(2), Number(10)], 0, 1), - ]], - }, - ]); -} - -#[test] -fn sum() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (sum ?amount) :with ?e :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::SUM], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Number(37)], 0, 1)]], - // set-semantics - // expectations: vec![ - // vec![(vec![Number(27)], 0, 1)], - // ], - }, - Case { - description: "[:find ?e (sum ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::SUM], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Number(17)], 0, 1), - (vec![Eid(2), Number(20)], 0, 1), - ]], - // set-semantics - // expectations: vec![ - // vec![ - // (vec![Eid(1), Number(17)], 0, 1), - // (vec![Eid(2), Number(10)], 0, 1), - // ], - // ], - }, - ]); -} - -#[test] -fn avg() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (avg ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::AVG], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Rational32(Ratio::new(37, 6))], 0, 1)]], - // set-semantics - // expectations: vec![ - // vec![(vec![Rational32(Ratio::new(27, 5))], 0, 1)], - // ], - }, - Case { - description: "[:find ?e (avg ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::AVG], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Rational32(Ratio::new(17, 4))], 0, 1), - (vec![Eid(2), Rational32(Ratio::new(20, 2))], 0, 1), - ]], - // set-semantics - // expectations: vec![ - // vec![ - // (vec![Eid(1), Rational32(Ratio::new(17, 4))], 0, 1), - // (vec![Eid(2), Rational32(Ratio::new(10, 1))], 0, 1), - // ], - // ], - }, - ]); -} - -#[test] -fn variance() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (variance ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::VARIANCE], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Rational32(Ratio::new(317, 36))], 0, 1)]], - // set-semantics - // expectations: vec![ - // vec![(vec![Rational32(Ratio::new(176, 25))], 0, 1)], - // ], - }, - Case { - description: "[:find ?e (variance ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::VARIANCE], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Rational32(Ratio::new(35, 16))], 0, 1), - (vec![Eid(2), Rational32(Ratio::new(0, 1))], 0, 1), - ]], - }, - ]); -} - -#[test] -fn median() { - let (e, amount) = (1, 2); - let data = vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(10)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(4)), - TxData(1, 1, ":amount".to_string(), Number(6)), - ]; - - run_cases(vec![ - Case { - description: "[:find (median ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![amount], - plan: Box::new(Plan::Project(Project { - variables: vec![amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - })), - aggregation_fns: vec![AggregationFn::MEDIAN], - key_variables: vec![], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![(vec![Number(5)], 0, 1)]], - }, - Case { - description: "[:find ?e (median ?amount) :where [?e :amount ?amount]]", - plan: Plan::Aggregate(Aggregate { - variables: vec![e, amount], - plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - aggregation_fns: vec![AggregationFn::MEDIAN], - key_variables: vec![e], - aggregation_variables: vec![amount], - with_variables: vec![], - }), - transactions: vec![data.clone()], - expectations: vec![vec![ - (vec![Eid(1), Number(5)], 0, 1), - (vec![Eid(2), Number(10)], 0, 1), - ]], - }, - ]); -} - #[test] fn multiple_aggregations() { run_cases(vec![ Case { description: - "[:find (max ?amount) (min ?debt) (sum ?amount) (avg ?debt) \ + "[:find (sum ?amount) (sum ?debt) (count ?amount) (count ?debt) \ :where [?e :amount ?amount][?e :debt ?debt]]", plan: { let (e, amount, debt) = (1, 2, 3); @@ -503,10 +130,10 @@ fn multiple_aggregations() { })), })), aggregation_fns: vec![ - AggregationFn::MAX, - AggregationFn::MIN, AggregationFn::SUM, - AggregationFn::AVG, + AggregationFn::SUM, + AggregationFn::COUNT, + AggregationFn::COUNT, ], key_variables: vec![], aggregation_variables: vec![amount, debt, amount, debt], @@ -516,24 +143,14 @@ fn multiple_aggregations() { transactions: vec![ vec![ TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(6)), - TxData(1, 1, ":amount".to_string(), Number(9)), - TxData(1, 1, ":amount".to_string(), Number(10)), - TxData(1, 1, ":debt".to_string(), Number(13)), - TxData(1, 1, ":debt".to_string(), Number(4)), - TxData(1, 1, ":debt".to_string(), Number(9)), - TxData(1, 1, ":debt".to_string(), Number(15)), - TxData(1, 1, ":debt".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(2)), - TxData(1, 2, ":amount".to_string(), Number(4)), - TxData(1, 2, ":debt".to_string(), Number(5)), - TxData(1, 2, ":debt".to_string(), Number(42)), + TxData(1, 2, ":amount".to_string(), Number(5)), + TxData(1, 1, ":debt".to_string(), Number(12)), + TxData(1, 2, ":debt".to_string(), Number(15)), ], ], expectations: vec![ vec![ - (vec![Number(10), Number(4), Number(172), Rational32(Ratio::new(349, 29))], 0, 1), + (vec![Number(10), Number(12)], 0, 1), ], ], // set-semantics @@ -543,107 +160,5 @@ fn multiple_aggregations() { // ], // ], }, - Case { - description: - "[:find ?e (min ?amount) (max ?amount) (median ?amount) (count ?amount) (min ?debt) (max ?debt) (median ?debt) (count ?debt) \ - :where [?e :amount ?amount][?e :debt ?debt]]", - plan: { - let (e, amount, debt) = (1, 2, 3); - Plan::Aggregate(Aggregate { - variables: vec![e, amount, amount, amount, amount, debt, debt, debt, debt], - plan: Box::new(Plan::Project(Project { - variables: vec![e, amount, debt], - plan: Box::new(Plan::Join(Join { - variables: vec![e], - left_plan: Box::new(Plan::MatchA(e, ":amount".to_string(), amount)), - right_plan: Box::new(Plan::MatchA(e, ":debt".to_string(), debt)), - })), - })), - aggregation_fns: vec![ - AggregationFn::MIN, - AggregationFn::MAX, - AggregationFn::MEDIAN, - AggregationFn::COUNT, - AggregationFn::MIN, - AggregationFn::MAX, - AggregationFn::MEDIAN, - AggregationFn::COUNT, - ], - key_variables: vec![e], - aggregation_variables: vec![amount, amount, amount, amount, debt, debt, debt, debt], - with_variables: vec![], - }) - }, - transactions: vec![ - vec![ - TxData(1, 1, ":amount".to_string(), Number(5)), - TxData(1, 1, ":amount".to_string(), Number(2)), - TxData(1, 1, ":amount".to_string(), Number(6)), - TxData(1, 1, ":amount".to_string(), Number(9)), - TxData(1, 1, ":amount".to_string(), Number(10)), - TxData(1, 1, ":debt".to_string(), Number(13)), - TxData(1, 1, ":debt".to_string(), Number(4)), - TxData(1, 1, ":debt".to_string(), Number(9)), - TxData(1, 1, ":debt".to_string(), Number(15)), - TxData(1, 1, ":debt".to_string(), Number(10)), - TxData(1, 2, ":amount".to_string(), Number(2)), - TxData(1, 2, ":amount".to_string(), Number(4)), - TxData(1, 2, ":debt".to_string(), Number(5)), - TxData(1, 2, ":debt".to_string(), Number(42)), - ], - ], - expectations: vec![ - vec![ - (vec![Eid(1), Number(2), Number(10), Number(6), Number(25), Number(4), Number(15), Number(10), Number(25)], 0, 1), - (vec![Eid(2), Number(2), Number(4), Number(4), Number(4), Number(5), Number(42), Number(42), Number(4)], 0, 1), - ], - ], - // set-semantics - // expectations: vec![ - // vec![ - // (vec![Eid(1), Number(2), Number(10), Number(6), Number(5), Number(4), Number(15), Number(10), Number(5)], 0, 1), - // (vec![Eid(2), Number(2), Number(4), Number(4), Number(2), Number(5), Number(42), Number(42), Number(2)], 0, 1), - // ], - // ], - }, - Case { - description: - "[:find (sum ?heads) \ - :with ?monster \ - :where [?e :monster ?monster] [?e :head ?head]]", - plan: { - let (e, monster, heads) = (1, 2, 3); - Plan::Aggregate(Aggregate { - variables: vec![heads], - plan: Box::new(Plan::Project(Project { - variables: vec![heads, monster], - plan: Box::new(Plan::Join(Join { - variables: vec![e], - left_plan: Box::new(Plan::MatchA(e, ":monster".to_string(), monster)), - right_plan: Box::new(Plan::MatchA(e, ":heads".to_string(), heads)), - })), - })), - aggregation_fns: vec![AggregationFn::SUM], - key_variables: vec![], - aggregation_variables: vec![heads], - with_variables: vec![monster], - }) - }, - transactions: vec![ - vec![ - TxData(1, 1, ":monster".to_string(), String("Cerberus".to_string())), - TxData(1, 1, ":heads".to_string(), Number(3)), - TxData(1, 2, ":monster".to_string(), String("Medusa".to_string())), - TxData(1, 2, ":heads".to_string(), Number(1)), - TxData(1, 3, ":monster".to_string(), String("Cyclops".to_string())), - TxData(1, 3, ":heads".to_string(), Number(1)), - TxData(1, 4, ":monster".to_string(), String("Chimera".to_string())), - TxData(1, 4, ":heads".to_string(), Number(1)), - ], - ], - expectations: vec![ - vec![(vec![Number(6)], 0, 1)], - ], - }, ]); }