Conversation
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
| let agg = &options.aggregate_fn; | ||
|
|
||
| // Try encoding-specific fast path first. | ||
| if let Some(states) = list.elements().aggregate_list(&list, agg)? { |
| let list = args.inputs[0].to_listview()?; | ||
| let agg = &options.aggregate_fn; | ||
|
|
||
| // Try encoding-specific fast path first. |
There was a problem hiding this comment.
Where do the encoding specific stuff happen?
| fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()> { | ||
| for i in 0..list.len() { | ||
| self.accumulate(&list.list_elements_at(i)?)?; | ||
| self.flush()?; | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
I think we might want to use a array + offset + len, approach to avoid list construction at each step?
There was a problem hiding this comment.
What do you mean each step?
There was a problem hiding this comment.
I way thinking as you do pushdown or reduce you will need to unwrap the elements, unwrap an encodings and wrap that up with offset + len
| **Self-reduce** (`ScalarFnVTable::reduce`): constant list folding, count from list sizes, | ||
| min/max from statistics, sum of constant elements. |
There was a problem hiding this comment.
what method on Aggregate is called here?
| **Parent-reduce** (encoding-specific): child encodings match on `ExactScalarFn<ListAggregate>` | ||
| to optimize specific aggregate + encoding combinations. For example: | ||
|
|
||
| - **Dict**: `ListAggregate(Min/Max, List(Dict(codes, values)))` pushes down to values. |
There was a problem hiding this comment.
what is the check applied for this?
| like `list_sum`, `list_min`, etc. don't yet exist — and implementing each one separately would | ||
| duplicate the underlying aggregation logic. | ||
|
|
||
| The key observation is that a list column stored as `(offsets, elements)` is a pre-materialized |
| infrastructure enables serializing intermediate state for distributed execution. A | ||
| `state()` export method on `Accumulator` would complete this. | ||
|
|
||
| - **Aggregate push-down in Scan**: using reduce rules to push aggregates into `LayoutReader`, |
There was a problem hiding this comment.
So for now ListScalarFn can't be pushed into scan either then, until we figure out how to pushdown aggregates into scans
There was a problem hiding this comment.
I think we can? This comment is more about SELECT MIN(a + 10) FROM .../foo.vortex` being pushed into Vortex
| /// Merge a partial state scalar into the current group state. | ||
| fn merge( | ||
| &self, options: &Self::Options, state: &mut Self::GroupState, partial: &Scalar, | ||
| ) -> VortexResult<()>; |
There was a problem hiding this comment.
Why do you define merge in this way? It could be (GroupState, GroupState) -> GroupState
| ```rust | ||
| pub trait DynAccumulator: Send { | ||
| fn accumulate(&mut self, batch: &ArrayRef) -> VortexResult<()>; | ||
| fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()>; |
There was a problem hiding this comment.
| fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()>; | |
| fn accumulate_group(&mut self, list: &ListViewArray) -> VortexResult<()>; |
| fn accumulate(&mut self, batch: &ArrayRef) -> VortexResult<()>; | ||
| fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()>; | ||
| fn merge(&mut self, state: &Scalar) -> VortexResult<()>; | ||
| fn merge_list(&mut self, states: &ArrayRef) -> VortexResult<()>; |
| | ------------ | ---------------------------------------- | ----------------------------------------- | | ||
| | `Sum` | `i64` (or widened input type) | `SumState::I64(Some(42))` | | ||
| | `Count` | `u64` | `u64` | | ||
| | `Min` | input element type | `MinState::I32(Some(3))` | |
There was a problem hiding this comment.
this has to be Optional incase there is no min
| fn state_dtype(&self, options: &Self::Options, input_dtype: &DType) -> VortexResult<DType>; | ||
|
|
||
| fn identity(&self, options: &Self::Options, input_dtype: &DType) | ||
| -> VortexResult<Self::GroupState>; |
There was a problem hiding this comment.
seems like we actually want None? Since stats might actually be None not the Group identity?
|
|
||
| /// Accumulate a canonical batch into the current group state. | ||
| fn accumulate( | ||
| &self, options: &Self::Options, state: &mut Self::GroupState, batch: &Canonical, |
There was a problem hiding this comment.
This is the fallback and we have encoding specific kernels?
| -> VortexResult<Self::GroupState>; | ||
|
|
||
| /// Accumulate a canonical batch into the current group state. | ||
| fn accumulate( |
There was a problem hiding this comment.
trying to pull out of stats happens here?
No description provided.