@@ -683,6 +683,7 @@ where
683683/// Implementation based on replaying historical and new updates together.
684684mod history_replay {
685685
686+ use std:: mem:: ManuallyDrop ;
686687 use :: difference:: Semigroup ;
687688 use lattice:: Lattice ;
688689 use trace:: Cursor ;
@@ -691,6 +692,13 @@ mod history_replay {
691692
692693 use super :: { PerKeyCompute , sort_dedup} ;
693694
695+ /// Clears and type erases a vector
696+ fn vec_to_parts < T > ( v : Vec < T > ) -> ( * mut ( ) , usize ) {
697+ let mut v = ManuallyDrop :: new ( v) ;
698+ v. clear ( ) ;
699+ ( v. as_mut_ptr ( ) as * mut ( ) , v. capacity ( ) )
700+ }
701+
694702 /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
695703 /// time order, maintaining consolidated representations of updates with respect to future interesting times.
696704 pub struct HistoryReplayer < ' a , V1 , V2 , T , R1 , R2 >
@@ -705,7 +713,11 @@ mod history_replay {
705713 input_history : ValueHistory < ' a , V1 , T , R1 > ,
706714 output_history : ValueHistory < ' a , V2 , T , R2 > ,
707715 input_buffer : Vec < ( & ' a V1 , R1 ) > ,
708- output_buffer_capacity : usize ,
716+ // A type erased pointer and capacity for the temporary output buffer passed to `logic`.
717+ // During `compute` the vector contains references to self which will get invalid as soon
718+ // as compute returns. For this reason the temporary vector is always cleared before
719+ // decomposing it back into its type erased parts
720+ output_buffer_parts : ( * mut ( ) , usize ) ,
709721 update_buffer : Vec < ( V2 , R2 ) > ,
710722 output_produced : Vec < ( ( V2 , T ) , R2 ) > ,
711723 synth_times : Vec < T > ,
@@ -728,7 +740,7 @@ mod history_replay {
728740 input_history : ValueHistory :: new ( ) ,
729741 output_history : ValueHistory :: new ( ) ,
730742 input_buffer : Vec :: new ( ) ,
731- output_buffer_capacity : 0 ,
743+ output_buffer_parts : vec_to_parts ( Vec :: < ( & V2 , R2 ) > :: new ( ) ) ,
732744 update_buffer : Vec :: new ( ) ,
733745 output_produced : Vec :: new ( ) ,
734746 synth_times : Vec :: new ( ) ,
@@ -913,7 +925,16 @@ mod history_replay {
913925 }
914926 crate :: consolidation:: consolidate ( & mut self . input_buffer ) ;
915927
916- let mut output_buffer = Vec :: with_capacity ( self . output_buffer_capacity ) ;
928+ let ( ptr, cap) = self . output_buffer_parts ;
929+ // SAFETY:
930+ // * `ptr` is valid because is has been previously allocated by a Vec
931+ // constructor parameterized with the same type argument
932+ // * `len` and `cap` are valid because the vector is converted to parts
933+ // only through `vec_to_parts` which clears the vector and gets its
934+ // capacity
935+ let mut output_buffer = unsafe {
936+ Vec :: from_raw_parts ( ptr as * mut ( & V2 , R2 ) , 0 , cap)
937+ } ;
917938 meet. as_ref ( ) . map ( |meet| output_replay. advance_buffer_by ( & meet) ) ;
918939 for & ( ( value, ref time) , ref diff) in output_replay. buffer ( ) . iter ( ) {
919940 if time. less_equal ( & next_time) {
@@ -937,8 +958,8 @@ mod history_replay {
937958 if self . input_buffer . len ( ) > 0 || output_buffer. len ( ) > 0 {
938959 logic ( key, & self . input_buffer [ ..] , & mut output_buffer, & mut self . update_buffer ) ;
939960 self . input_buffer . clear ( ) ;
940- self . output_buffer_capacity = output_buffer. capacity ( ) ;
941961 }
962+ self . output_buffer_parts = vec_to_parts ( output_buffer) ;
942963
943964 // output_replay.advance_buffer_by(&meet);
944965 // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
@@ -1077,6 +1098,28 @@ mod history_replay {
10771098 }
10781099 }
10791100
1101+ impl < ' a , V1 , V2 , T , R1 , R2 > Drop for HistoryReplayer < ' a , V1 , V2 , T , R1 , R2 >
1102+ where
1103+ V1 : Ord +Clone +' a ,
1104+ V2 : Ord +Clone +' a ,
1105+ T : Lattice +Ord +Clone ,
1106+ R1 : Semigroup ,
1107+ R2 : Semigroup ,
1108+ {
1109+ fn drop ( & mut self ) {
1110+ let ( ptr, cap) = self . output_buffer_parts ;
1111+ // SAFETY:
1112+ // * `ptr` is valid because is has been previously allocated by a Vec
1113+ // constructor parameterized with the same type argument
1114+ // * `len` and `cap` are valid because the vector is converted to parts
1115+ // only through `vec_to_parts` which clears the vector and gets its
1116+ // capacity
1117+ unsafe {
1118+ Vec :: from_raw_parts ( ptr as * mut ( & V2 , R2 ) , 0 , cap) ;
1119+ }
1120+ }
1121+ }
1122+
10801123 /// Updates an optional meet by an optional time.
10811124 fn update_meet < T : Lattice +Clone > ( meet : & mut Option < T > , other : Option < & T > ) {
10821125 if let Some ( time) = other {
0 commit comments