From f0dc7505af68994360ded567c7e8ba1b89e4deae Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Fri, 19 Nov 2021 13:12:19 +0100 Subject: [PATCH 1/2] reduce_core: present referenced data to logic function `reduce_core` was previously forced to clone the output data before presenting it to the logic function for inspection. Simply annotating the type with a reference was not possible due to the buffer being held in the same struct as some of the data it contained, making it a self-referencial struct which the borrow checker doesn't allow. However, the only reason this buffer was held in the same struct was to reuse its allocation. This patch therefore creates a brand new vector every time we need to process data that contains references instead of cloned data. The expectation is that in the general case it's better to allocate a vector of references than clone a bunch of values. Signed-off-by: Petros Angelatos --- src/operators/reduce.rs | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 135128efe..0ef4c8a39 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -281,7 +281,7 @@ pub trait ReduceCore where G::Timestam if !input.is_empty() { logic(key, input, change); } - change.extend(output.drain(..).map(|(x,d)| (x, d.negate()))); + change.extend(output.drain(..).map(|(x,d)| (x.clone(), d.negate()))); crate::consolidation::consolidate(change); }) } @@ -298,7 +298,7 @@ pub trait ReduceCore where G::Timestam T2::R: Semigroup, T2::Batch: Batch, T2::Cursor: Cursor, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static + L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -317,7 +317,7 @@ where T2: Trace+TraceReader+'static, T2::Batch: Batch, T2::Cursor: Cursor, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static + L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) .reduce_core(name, logic) @@ -338,7 +338,7 @@ where T2::R: Semigroup, T2::Batch: Batch, T2::Cursor: Cursor, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { + L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; @@ -676,7 +676,7 @@ where C1: Cursor, C2: Cursor, C3: Cursor, - L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>); + L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>); } @@ -705,7 +705,7 @@ mod history_replay { input_history: ValueHistory<'a, V1, T, R1>, output_history: ValueHistory<'a, V2, T, R2>, input_buffer: Vec<(&'a V1, R1)>, - output_buffer: Vec<(V2, R2)>, + output_buffer_capacity: usize, update_buffer: Vec<(V2, R2)>, output_produced: Vec<((V2, T), R2)>, synth_times: Vec, @@ -728,7 +728,7 @@ mod history_replay { input_history: ValueHistory::new(), output_history: ValueHistory::new(), input_buffer: Vec::new(), - output_buffer: Vec::new(), + output_buffer_capacity: 0, update_buffer: Vec::new(), output_produced: Vec::new(), synth_times: Vec::new(), @@ -754,7 +754,7 @@ mod history_replay { C1: Cursor, C2: Cursor, C3: Cursor, - L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>) + L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>) { // The work we need to perform is at times defined principally by the contents of `batch_cursor` @@ -913,10 +913,11 @@ mod history_replay { } crate::consolidation::consolidate(&mut self.input_buffer); + let mut output_buffer = Vec::with_capacity(self.output_buffer_capacity); meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet)); - for &((ref value, ref time), ref diff) in output_replay.buffer().iter() { + for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - self.output_buffer.push(((*value).clone(), diff.clone())); + output_buffer.push((value, diff.clone())); } else { self.temporary.push(next_time.join(time)); @@ -924,19 +925,19 @@ mod history_replay { } for &((ref value, ref time), ref diff) in self.output_produced.iter() { if time.less_equal(&next_time) { - self.output_buffer.push(((*value).clone(), diff.clone())); + output_buffer.push((value, diff.clone())); } else { self.temporary.push(next_time.join(&time)); } } - crate::consolidation::consolidate(&mut self.output_buffer); + crate::consolidation::consolidate(&mut output_buffer); // Apply user logic if non-empty input and see what happens! - if self.input_buffer.len() > 0 || self.output_buffer.len() > 0 { - logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer); + if self.input_buffer.len() > 0 || output_buffer.len() > 0 { + logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer); self.input_buffer.clear(); - self.output_buffer.clear(); + self.output_buffer_capacity = output_buffer.capacity(); } // output_replay.advance_buffer_by(&meet); From 3d625d2d7fa8d691358ca22381ffaa707bb493fe Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Fri, 19 Nov 2021 13:51:44 +0100 Subject: [PATCH 2/2] reduce: reuse the output buffer allocation This patch brings back the benefits of re-using the output buffer allocation at the cost of some unsafe code reasoning. Since the buffer is always cleared after each use we can simply store its type erased pointer and temporarily bring it back to its Vec form whenever there is some processing to do. Signed-off-by: Petros Angelatos --- src/operators/reduce.rs | 51 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 0ef4c8a39..b538e3503 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -683,6 +683,7 @@ where /// Implementation based on replaying historical and new updates together. mod history_replay { + use std::mem::ManuallyDrop; use ::difference::Semigroup; use lattice::Lattice; use trace::Cursor; @@ -691,6 +692,13 @@ mod history_replay { use super::{PerKeyCompute, sort_dedup}; + /// Clears and type erases a vector + fn vec_to_parts(v: Vec) -> (*mut (), usize) { + let mut v = ManuallyDrop::new(v); + v.clear(); + (v.as_mut_ptr() as *mut (), v.capacity()) + } + /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in /// time order, maintaining consolidated representations of updates with respect to future interesting times. pub struct HistoryReplayer<'a, V1, V2, T, R1, R2> @@ -705,7 +713,11 @@ mod history_replay { input_history: ValueHistory<'a, V1, T, R1>, output_history: ValueHistory<'a, V2, T, R2>, input_buffer: Vec<(&'a V1, R1)>, - output_buffer_capacity: usize, + // A type erased pointer and capacity for the temporary output buffer passed to `logic`. + // During `compute` the vector contains references to self which will get invalid as soon + // as compute returns. For this reason the temporary vector is always cleared before + // decomposing it back into its type erased parts + output_buffer_parts: (*mut (), usize), update_buffer: Vec<(V2, R2)>, output_produced: Vec<((V2, T), R2)>, synth_times: Vec, @@ -728,7 +740,7 @@ mod history_replay { input_history: ValueHistory::new(), output_history: ValueHistory::new(), input_buffer: Vec::new(), - output_buffer_capacity: 0, + output_buffer_parts: vec_to_parts(Vec::<(&V2, R2)>::new()), update_buffer: Vec::new(), output_produced: Vec::new(), synth_times: Vec::new(), @@ -913,7 +925,16 @@ mod history_replay { } crate::consolidation::consolidate(&mut self.input_buffer); - let mut output_buffer = Vec::with_capacity(self.output_buffer_capacity); + let (ptr, cap) = self.output_buffer_parts; + // SAFETY: + // * `ptr` is valid because is has been previously allocated by a Vec + // constructor parameterized with the same type argument + // * `len` and `cap` are valid because the vector is converted to parts + // only through `vec_to_parts` which clears the vector and gets its + // capacity + let mut output_buffer = unsafe { + Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap) + }; meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet)); for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { @@ -937,8 +958,8 @@ mod history_replay { if self.input_buffer.len() > 0 || output_buffer.len() > 0 { logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer); self.input_buffer.clear(); - self.output_buffer_capacity = output_buffer.capacity(); } + self.output_buffer_parts = vec_to_parts(output_buffer); // output_replay.advance_buffer_by(&meet); // for &((ref value, ref time), diff) in output_replay.buffer().iter() { @@ -1077,6 +1098,28 @@ mod history_replay { } } + impl<'a, V1, V2, T, R1, R2> Drop for HistoryReplayer<'a, V1, V2, T, R1, R2> + where + V1: Ord+Clone+'a, + V2: Ord+Clone+'a, + T: Lattice+Ord+Clone, + R1: Semigroup, + R2: Semigroup, + { + fn drop(&mut self) { + let (ptr, cap) = self.output_buffer_parts; + // SAFETY: + // * `ptr` is valid because is has been previously allocated by a Vec + // constructor parameterized with the same type argument + // * `len` and `cap` are valid because the vector is converted to parts + // only through `vec_to_parts` which clears the vector and gets its + // capacity + unsafe { + Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap); + } + } + } + /// Updates an optional meet by an optional time. fn update_meet(meet: &mut Option, other: Option<&T>) { if let Some(time) = other {