Skip to content

Commit f0dc750

Browse files
committed
reduce_core: present referenced data to logic function
`reduce_core` was previously forced to clone the output data before presenting it to the logic function for inspection. Simply annotating the type with a reference was not possible due to the buffer being held in the same struct as some of the data it contained, making it a self-referencial struct which the borrow checker doesn't allow. However, the only reason this buffer was held in the same struct was to reuse its allocation. This patch therefore creates a brand new vector every time we need to process data that contains references instead of cloned data. The expectation is that in the general case it's better to allocate a vector of references than clone a bunch of values. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent c696ec0 commit f0dc750

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

src/operators/reduce.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
281281
if !input.is_empty() {
282282
logic(key, input, change);
283283
}
284-
change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
284+
change.extend(output.drain(..).map(|(x,d)| (x.clone(), d.negate())));
285285
crate::consolidation::consolidate(change);
286286
})
287287
}
@@ -298,7 +298,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
298298
T2::R: Semigroup,
299299
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
300300
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
301-
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
301+
L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
302302
;
303303
}
304304

@@ -317,7 +317,7 @@ where
317317
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
318318
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
319319
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
320-
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
320+
L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
321321
{
322322
self.arrange_by_key_named(&format!("Arrange: {}", name))
323323
.reduce_core(name, logic)
@@ -338,7 +338,7 @@ where
338338
T2::R: Semigroup,
339339
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
340340
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
341-
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {
341+
L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {
342342

343343
let mut result_trace = None;
344344

@@ -676,7 +676,7 @@ where
676676
C1: Cursor<K, V1, T, R1>,
677677
C2: Cursor<K, V2, T, R2>,
678678
C3: Cursor<K, V1, T, R1>,
679-
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>);
679+
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>);
680680
}
681681

682682

@@ -705,7 +705,7 @@ mod history_replay {
705705
input_history: ValueHistory<'a, V1, T, R1>,
706706
output_history: ValueHistory<'a, V2, T, R2>,
707707
input_buffer: Vec<(&'a V1, R1)>,
708-
output_buffer: Vec<(V2, R2)>,
708+
output_buffer_capacity: usize,
709709
update_buffer: Vec<(V2, R2)>,
710710
output_produced: Vec<((V2, T), R2)>,
711711
synth_times: Vec<T>,
@@ -728,7 +728,7 @@ mod history_replay {
728728
input_history: ValueHistory::new(),
729729
output_history: ValueHistory::new(),
730730
input_buffer: Vec::new(),
731-
output_buffer: Vec::new(),
731+
output_buffer_capacity: 0,
732732
update_buffer: Vec::new(),
733733
output_produced: Vec::new(),
734734
synth_times: Vec::new(),
@@ -754,7 +754,7 @@ mod history_replay {
754754
C1: Cursor<K, V1, T, R1>,
755755
C2: Cursor<K, V2, T, R2>,
756756
C3: Cursor<K, V1, T, R1>,
757-
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>)
757+
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>)
758758
{
759759

760760
// The work we need to perform is at times defined principally by the contents of `batch_cursor`
@@ -913,30 +913,31 @@ mod history_replay {
913913
}
914914
crate::consolidation::consolidate(&mut self.input_buffer);
915915

916+
let mut output_buffer = Vec::with_capacity(self.output_buffer_capacity);
916917
meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet));
917-
for &((ref value, ref time), ref diff) in output_replay.buffer().iter() {
918+
for &((value, ref time), ref diff) in output_replay.buffer().iter() {
918919
if time.less_equal(&next_time) {
919-
self.output_buffer.push(((*value).clone(), diff.clone()));
920+
output_buffer.push((value, diff.clone()));
920921
}
921922
else {
922923
self.temporary.push(next_time.join(time));
923924
}
924925
}
925926
for &((ref value, ref time), ref diff) in self.output_produced.iter() {
926927
if time.less_equal(&next_time) {
927-
self.output_buffer.push(((*value).clone(), diff.clone()));
928+
output_buffer.push((value, diff.clone()));
928929
}
929930
else {
930931
self.temporary.push(next_time.join(&time));
931932
}
932933
}
933-
crate::consolidation::consolidate(&mut self.output_buffer);
934+
crate::consolidation::consolidate(&mut output_buffer);
934935

935936
// Apply user logic if non-empty input and see what happens!
936-
if self.input_buffer.len() > 0 || self.output_buffer.len() > 0 {
937-
logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
937+
if self.input_buffer.len() > 0 || output_buffer.len() > 0 {
938+
logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer);
938939
self.input_buffer.clear();
939-
self.output_buffer.clear();
940+
self.output_buffer_capacity = output_buffer.capacity();
940941
}
941942

942943
// output_replay.advance_buffer_by(&meet);

0 commit comments

Comments
 (0)