Skip to content

Commit d8ff8b3

Browse files
committed
Form key batches in columnar
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
1 parent 5c61b40 commit d8ff8b3

File tree

2 files changed

+145
-7
lines changed

2 files changed

+145
-7
lines changed

differential-dataflow/examples/columnar.rs

Lines changed: 143 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use {
77
timely::dataflow::ProbeHandle,
88
};
99

10-
use differential_dataflow::trace::implementations::ord_neu::ColValSpine;
10+
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
1111

1212
use differential_dataflow::operators::arrange::arrangement::arrange_core;
1313

@@ -44,8 +44,8 @@ fn main() {
4444
let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>());
4545
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>());
4646

47-
let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColValSpine<_,_,_,_>>(&data, data_pact, "Data");
48-
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColValSpine<_,_,_,_>>(&keys, keys_pact, "Keys");
47+
let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
48+
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");
4949

5050
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
5151
.probe_with(&probe);
@@ -595,16 +595,16 @@ pub mod dd_builder {
595595
use differential_dataflow::trace::implementations::Layout;
596596
use differential_dataflow::trace::implementations::Update;
597597
use differential_dataflow::trace::implementations::BatchContainer;
598-
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage};
599-
598+
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch};
599+
use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage;
600600
use crate::Column;
601601

602602

603603
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
604604
use differential_dataflow::trace::implementations::TStack;
605605

606606
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>>>;
607-
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdValBuilder<TStack<((K,()),T,R)>>>;
607+
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>>>;
608608

609609
// Utility types to save some typing and avoid visual noise. Extract the owned type and
610610
// the batch's read items from a layout.
@@ -776,4 +776,141 @@ pub mod dd_builder {
776776
builder.done(description)
777777
}
778778
}
779+
780+
/// A builder for creating layers from unsorted update tuples.
781+
pub struct OrdKeyBuilder<L: Layout> {
782+
/// The in-progress result.
783+
///
784+
/// This is public to allow container implementors to set and inspect their container.
785+
pub result: OrdKeyStorage<L>,
786+
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
787+
/// Counts the number of singleton optimizations we performed.
788+
///
789+
/// This number allows us to correctly gauge the total number of updates reflected in a batch,
790+
/// even though `updates.len()` may be much shorter than this amount.
791+
singletons: usize,
792+
}
793+
794+
impl<L: Layout> OrdKeyBuilder<L> {
795+
/// Pushes a single update, which may set `self.singleton` rather than push.
796+
///
797+
/// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
798+
/// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
799+
/// to encode a singleton update with an "absert" update: repeating the most recent offset.
800+
/// This otherwise invalid state encodes "look back one element".
801+
///
802+
/// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
803+
/// previously pushed update exactly. In that case, we do not push the update into `updates`.
804+
/// The update tuple is retained in `self.singleton` in case we see another update and need
805+
/// to recover the singleton to push it into `updates` to join the second update.
806+
fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
807+
// If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
808+
if self.result.times.last().map(|t| t == ReadItemTime::<L>::borrow_as(&time)) == Some(true) &&
809+
self.result.diffs.last().map(|d| d == ReadItemDiff::<L>::borrow_as(&diff)) == Some(true)
810+
{
811+
assert!(self.singleton.is_none());
812+
self.singleton = Some((time, diff));
813+
}
814+
else {
815+
// If we have pushed a single element, we need to copy it out to meet this one.
816+
if let Some((time, diff)) = self.singleton.take() {
817+
self.result.times.push(time);
818+
self.result.diffs.push(diff);
819+
}
820+
self.result.times.push(time);
821+
self.result.diffs.push(diff);
822+
}
823+
}
824+
}
825+
826+
// The layout `L` determines the key, val, time, and diff types.
827+
impl<L> Builder for OrdKeyBuilder<L>
828+
where
829+
L: Layout,
830+
OwnedKey<L>: Columnar,
831+
OwnedVal<L>: Columnar,
832+
OwnedTime<L>: Columnar,
833+
OwnedDiff<L>: Columnar,
834+
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
835+
for<'a> L::KeyContainer: PushInto<&'a OwnedKey<L>>,
836+
for<'a> L::ValContainer: PushInto<&'a OwnedVal<L>>,
837+
for<'a> ReadItemTime<'a, L> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
838+
for<'a> ReadItemDiff<'a, L> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
839+
{
840+
type Input = Column<((OwnedKey<L>,OwnedVal<L>),OwnedTime<L>,OwnedDiff<L>)>;
841+
type Time = <L::Target as Update>::Time;
842+
type Output = OrdKeyBatch<L>;
843+
844+
fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
845+
// We don't introduce zero offsets as they will be introduced by the first `push` call.
846+
Self {
847+
result: OrdKeyStorage {
848+
keys: L::KeyContainer::with_capacity(keys),
849+
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
850+
times: L::TimeContainer::with_capacity(upds),
851+
diffs: L::DiffContainer::with_capacity(upds),
852+
},
853+
singleton: None,
854+
singletons: 0,
855+
}
856+
}
857+
858+
#[inline]
859+
fn push(&mut self, chunk: &mut Self::Input) {
860+
use timely::Container;
861+
862+
// NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
863+
// is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
864+
// but avoids e.g. calls into `last()` and breaks horrid trait requirements.
865+
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
866+
// and we need to correctly cache last for reasons of correctness, not just performance.
867+
868+
let mut owned_key = None;
869+
870+
for ((key,_val),time,diff) in chunk.drain() {
871+
let key = if let Some(owned_key) = owned_key.as_mut() {
872+
OwnedKey::<L>::copy_from(owned_key, key);
873+
owned_key
874+
} else {
875+
owned_key.insert(OwnedKey::<L>::into_owned(key))
876+
};
877+
878+
let time = OwnedTime::<L>::into_owned(time);
879+
let diff = OwnedDiff::<L>::into_owned(diff);
880+
881+
// Perhaps this is a continuation of an already received key.
882+
if self.result.keys.last().map(|k| ReadItemKey::<L>::borrow_as(key).eq(&k)).unwrap_or(false) {
883+
self.push_update(time, diff);
884+
} else {
885+
// New key; complete representation of prior key.
886+
self.result.keys_offs.push(self.result.times.len());
887+
if self.singleton.take().is_some() { self.singletons += 1; }
888+
self.push_update(time, diff);
889+
self.result.keys.push(key);
890+
}
891+
}
892+
}
893+
894+
#[inline(never)]
895+
fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
896+
// Record the final offsets
897+
self.result.keys_offs.push(self.result.times.len());
898+
// Remove any pending singleton, and if it was set increment our count.
899+
if self.singleton.take().is_some() { self.singletons += 1; }
900+
OrdKeyBatch {
901+
updates: self.result.times.len() + self.singletons,
902+
storage: self.result,
903+
description,
904+
}
905+
}
906+
907+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
908+
let mut builder = Self::with_capacity(0, 0, 0);
909+
for mut chunk in chain.drain(..) {
910+
builder.push(&mut chunk);
911+
}
912+
913+
builder.done(description)
914+
}
915+
}
779916
}

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,8 @@ pub mod val_batch {
677677
}
678678
}
679679

680-
mod key_batch {
680+
/// Types related to forming batches of keys.
681+
pub mod key_batch {
681682

682683
use std::marker::PhantomData;
683684
use serde::{Deserialize, Serialize};

0 commit comments

Comments
 (0)