diff --git a/docs/groovy/reference/table-operations/create/rollup.md b/docs/groovy/reference/table-operations/create/rollup.md index 87966bbbb89..56779ad74f6 100644 --- a/docs/groovy/reference/table-operations/create/rollup.md +++ b/docs/groovy/reference/table-operations/create/rollup.md @@ -28,6 +28,8 @@ The following aggregations are supported: - [`AggCountDistinct`](../group-and-aggregate/AggCountDistinct.md) - [`AggCountWhere`](../group-and-aggregate/AggCountWhere.md) - [`AggFirst`](../group-and-aggregate/AggFirst.md) +- [`AggFormula`](../group-and-aggregate/AggFormula.md) +- [`AggGroup`](../group-and-aggregate/AggGroup.md) - [`AggLast`](../group-and-aggregate/AggLast.md) - [`AggMax`](../group-and-aggregate/AggMax.md) - [`AggMin`](../group-and-aggregate/AggMin.md) @@ -124,6 +126,78 @@ result = source.rollup(aggList, false, "N", "M") ![The above `result` rollup table](../../../assets/how-to/rollup-table-realtime.gif) +## Formula Aggregations in Rollups + +When a rollups includes a formula aggregation, care should be taken with the function being applied. On each tick, the formula is evaluated for each changed row in the output table. Because the aggregated rows include many source rows, the input vectors to a formula aggregation can be very large (at the root level, they are the entire source table). If the formula is not efficient with large input vectors, the performance of the rollup can be poor. + +By default, the formula aggregation operates on a group of all the values as they appeared in the source table. In this example, the `Value` column contains the same vector that is used as input to the formula: + +```groovy +source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", 10, 10, 10, 20, 20, 30, 30)) +simpleSum = source.rollup(List.of(AggGroup("Value"), AggFormula("Sum = sum(Value)")), "Key") +``` + +To calculate the sum for the root row, every row in the source table is read. The Deephaven engine provides no mechanism to provide detailed update information for a vector. Thus, every time the table ticks, the formula is completely re-evaluated. + +### Formula Reaggregation + +Formula reaggregation can be used to limit the size of input vectors while evaluating changes to a rollup. Each level of the rollup must have the same constituent types and names, which can make formulating your query more complicated. + +```groovy +source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", 10, 10, 10, 20, 20, 30, 30)) +reaggregatedSum = source.updateView("Sum=(long)Value").rollup(List.of(AggFormula("Sum = sum(Sum)").asReaggregating()), "Key") +``` + +The results of `reaggregatedSum` are identical to `simpleSum`; but they are evaluated differently. In simpleSum, the source table is read twice: once to calculate the lowest-level sums, and a second time to calculate the top-level sum. In the reaggregated example, the source table is read once to calculate the lowest-level sums; and then the intermediate sums for each `Key` are read to calculate the top-level sum. If a row was added to `source` with the key `Delta`; then `simpleSum` would read that row, calculate a new sum for `Delta` and the top-level would read all eight rows of the table. The `reaggregatedSum` would similarly calculate the new sum for `Delta`, but the top-level would only read the intermediate sums for `Alpha`, `Bravo`, `Charlie`, and `Delta` instead of all eight source rows. As the number of states and the size of the input tables increase, the performance impact of evaluating a formula over all rows the table increases. + +In the previous example, the `Sum` column evaluated the [`sum(IntVector)`](https://docs.deephaven.io/core/javadoc/io/deephaven/function/Numeric.html#sum(io.deephaven.vector.IntVector)) function at every level of the rollup and produced a `long`. If the original table with an `int` column was used, then the lowest-level rollup would provide an `IntVector` as input to the `sum` and the next-level would provide `LongVector`. Similarly, the source table had a column named `Value`; whereas the aggregation produces a result named `Sum`. To address both these issues, before passing `source` to rollup, we called `updateView` to cast the `Value` column to `long` as `Sum`. If we ran the same example without the cast: + +```groovy syntax +source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", 10, 10, 10, 20, 20, 30, 30)) +reaggregatedSum = source.rollup(List.of(AggFormula("Value = sum(Value)").asReaggregating()), "Key") +``` + +We instead get an Exception message indicating that the formula cannot be applied properly: + +```text +java.lang.ClassCastException: class io.deephaven.engine.table.vectors.LongVectorColumnWrapper cannot be cast to class io.deephaven.vector.IntVector (io.deephaven.engine.table.vectors.LongVectorColumnWrapper and io.deephaven.vector.IntVector are in unnamed module of loader 'app') +``` + +### Formula Depth + +Formula aggregations may include the constant `__FORMULA_DEPTH__` column, which is the depth of the formula aggregation in the rollup tree. The root node of the rollup has a depth of 0, the next level is 1, and so on. This can be used to implement distinct aggregations at each level of the rollup. For example: + +```groovy +source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", 10, 10, 10, 20, 20, 30, 30)) +firstThenSum = source.rollup(List.of(AggFormula("Value = __FORMULA_DEPTH__ == 0 ? sum(Value) : first(Value)")), "Key") +``` + +In this case, for each value of `Key`, the aggregation returns the first value. For the root level, the aggregation returns the sum of all values. When combined with a reaggregating formula, even more interesting semantics are possible. For example, rather than summing all of the values; we can sum the values from the prior level: + +```groovy +source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", 10, 10, 10, 20, 20, 30, 30)) +firstThenSum = source.updateView("Value=(long)Value").rollup(List.of(AggFormula("Value = __FORMULA_DEPTH__ == 0 ? sum(Value) : first(Value)").asReaggregating()), "Key") +``` + +Another simple example of reaggration is a capped sum. In this example, the sums below the root level are capped at 40: + +```groovy +source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", 10, 20, 15, 20, 15, 25, 35)) +cappedSum = source.updateView("Value=(long)Value").rollup(List.of(AggFormula("Value = __FORMULA_DEPTH__ == 0 ? sum(Value) : min(sum(Value), 40)").asReaggregating()), "Key") +``` + ## Related documentation - [`AggAvg`](../group-and-aggregate/AggAvg.md) diff --git a/docs/groovy/reference/table-operations/group-and-aggregate/AggFormula.md b/docs/groovy/reference/table-operations/group-and-aggregate/AggFormula.md index d82a335f7bf..bec64775953 100644 --- a/docs/groovy/reference/table-operations/group-and-aggregate/AggFormula.md +++ b/docs/groovy/reference/table-operations/group-and-aggregate/AggFormula.md @@ -7,6 +7,7 @@ title: AggFormula ## Syntax ``` +AggFormula(formula) AggFormula(formula, paramToken, columnNames...) ``` @@ -21,19 +22,21 @@ The user-defined formula to apply to each group. This formula can contain: - Mathematical operations such as `*`, `+`, `/`, etc. - [User-defined closures](../../../how-to-guides/groovy-closures.md) -If `paramToken` is not `null`, the formula can only be applied to one column at a time, and it is applied to the specified `paramToken`. If `paramToken` is `null`, the formula is applied to any column or literal value present in the formula. The use of `paramToken` is deprecated. +The formula is typically specified as `OutputColumn = Expression` (when `paramToken` is `null`). The formula is applied to any column or literal value present in the formula. For example, `Out = KeyColumn * max(ValueColumn)`, produces an output column with the name `Out` and uses `KeyColumn` and `ValueColumn` as inputs. + +If `paramToken` is not `null`, the formula can only be applied to one column at a time, and it is applied to the specified `paramToken`. In this case, the formula does not supply an output column, but rather it is derived from the `columnNames` parameter. The use of `paramToken` is deprecated. Key column(s) can be used as input to the formula. When this happens, key values are treated as scalars. -The parameter name within the formula. If `paramToken` is `each`, then `formula` must contain `each`. For example, `max(each)`, `min(each)`, etc. Use of this parameter is deprecated. +The parameter name within the formula. If `paramToken` is `each`, then `formula` must contain `each`. For example, `max(each)`, `min(each)`, etc. Use of this parameter is deprecated. A non-null value is not permitted in [rollups](../create/rollup.md). -The source column(s) for the calculations. +The source column(s) for the calculations. The source column names are only used when `paramToken` is not `null`, and are thus similarly deprecated. - `"X"` applies the formula to each value in the `X` column for each group. - `"Y = X"` applies the formula to each value in the `X` column for each group and renames it to `Y`. diff --git a/docs/groovy/snapshots/147d1e4ef58979040021c4e15859a139.json b/docs/groovy/snapshots/147d1e4ef58979040021c4e15859a139.json new file mode 100644 index 00000000000..7a63af13972 --- /dev/null +++ b/docs/groovy/snapshots/147d1e4ef58979040021c4e15859a139.json @@ -0,0 +1 @@ +{"file":"reference/table-operations/create/rollup.md","objects":{"source":{"type":"Table","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"int"}],"rows":[[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Charlie"},{"value":"30"}],[{"value":"Charlie"},{"value":"30"}]]}},"reaggregatedSum":{"type":"HierarchicalTable","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Sum","type":"long"}],"rows":[[{"value":""},{"value":"130"}],[{"value":"Alpha"},{"value":"30"}],[{"value":"Bravo"},{"value":"40"}],[{"value":"Charlie"},{"value":"60"}]],"rowDepths":[1,2,2,2]}}}} \ No newline at end of file diff --git a/docs/groovy/snapshots/74a7db0b5abe5682f828a576c69e2222.json b/docs/groovy/snapshots/74a7db0b5abe5682f828a576c69e2222.json deleted file mode 100644 index 25793bcdd4b..00000000000 --- a/docs/groovy/snapshots/74a7db0b5abe5682f828a576c69e2222.json +++ /dev/null @@ -1 +0,0 @@ -{"file":"how-to-guides/excel/excel-client.md","objects":{"static_table":{"type":"Table","data":{"columns":[{"name":"X","type":"int"}],"rows":[[{"value":"5"}],[{"value":"2"}],[{"value":"9"}],[{"value":"8"}],[{"value":"1"}],[{"value":"7"}],[{"value":"4"}],[{"value":"8"}],[{"value":"6"}],[{"value":"9"}]]}}}} \ No newline at end of file diff --git a/docs/groovy/snapshots/a79ff17b43af37762a7fc93060ed494e.json b/docs/groovy/snapshots/a79ff17b43af37762a7fc93060ed494e.json new file mode 100644 index 00000000000..4abb4083908 --- /dev/null +++ b/docs/groovy/snapshots/a79ff17b43af37762a7fc93060ed494e.json @@ -0,0 +1 @@ +{"file":"reference/table-operations/create/rollup.md","objects":{"source":{"type":"Table","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"int"}],"rows":[[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Charlie"},{"value":"30"}],[{"value":"Charlie"},{"value":"30"}]]}},"simpleSum":{"type":"HierarchicalTable","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"io.deephaven.vector.IntVector"},{"name":"Sum","type":"long"}],"rows":[[{"value":""},{"value":"10,10,10,20,20,30,30"},{"value":"130"}],[{"value":"Alpha"},{"value":"10,10,10"},{"value":"30"}],[{"value":"Bravo"},{"value":"20,20"},{"value":"40"}],[{"value":"Charlie"},{"value":"30,30"},{"value":"60"}]],"rowDepths":[1,2,2,2]}}}} \ No newline at end of file diff --git a/docs/groovy/snapshots/db3562b0dc23b199a91d0762dad1ac03.json b/docs/groovy/snapshots/db3562b0dc23b199a91d0762dad1ac03.json deleted file mode 100644 index d22df14b1c1..00000000000 --- a/docs/groovy/snapshots/db3562b0dc23b199a91d0762dad1ac03.json +++ /dev/null @@ -1 +0,0 @@ -{"file":"how-to-guides/excel/excel-client.md","objects":{"crypto_table":{"type":"Table","data":{"columns":[{"name":"Timestamp","type":"java.time.Instant"},{"name":"Exchange","type":"java.lang.String"},{"name":"Price","type":"double"},{"name":"Size","type":"double"}],"rows":[]}}}} \ No newline at end of file diff --git a/docs/groovy/snapshots/df5a083869378962413e5e1ec88361d6.json b/docs/groovy/snapshots/df5a083869378962413e5e1ec88361d6.json new file mode 100644 index 00000000000..3ddb3f43ca3 --- /dev/null +++ b/docs/groovy/snapshots/df5a083869378962413e5e1ec88361d6.json @@ -0,0 +1 @@ +{"file":"reference/table-operations/create/rollup.md","objects":{"source":{"type":"Table","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"int"}],"rows":[[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"20"}],[{"value":"Alpha"},{"value":"15"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Bravo"},{"value":"15"}],[{"value":"Charlie"},{"value":"25"}],[{"value":"Charlie"},{"value":"35"}]]}},"cappedSum":{"type":"HierarchicalTable","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"long"}],"rows":[[{"value":""},{"value":"115"}],[{"value":"Alpha"},{"value":"40"}],[{"value":"Bravo"},{"value":"35"}],[{"value":"Charlie"},{"value":"40"}]],"rowDepths":[1,2,2,2]}}}} \ No newline at end of file diff --git a/docs/groovy/snapshots/f1ee5c94013279dcc75428cd1e365ea4.json b/docs/groovy/snapshots/f1ee5c94013279dcc75428cd1e365ea4.json new file mode 100644 index 00000000000..a532e456acd --- /dev/null +++ b/docs/groovy/snapshots/f1ee5c94013279dcc75428cd1e365ea4.json @@ -0,0 +1 @@ +{"file":"reference/table-operations/create/rollup.md","objects":{"source":{"type":"Table","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"int"}],"rows":[[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Charlie"},{"value":"30"}],[{"value":"Charlie"},{"value":"30"}]]}},"firstThenSum":{"type":"HierarchicalTable","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"long"}],"rows":[[{"value":""},{"value":"130"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Charlie"},{"value":"30"}]],"rowDepths":[1,2,2,2]}}}} \ No newline at end of file diff --git a/docs/groovy/snapshots/f2208a297db36f58594c7ecc25d3713f.json b/docs/groovy/snapshots/f2208a297db36f58594c7ecc25d3713f.json new file mode 100644 index 00000000000..f081a76cd49 --- /dev/null +++ b/docs/groovy/snapshots/f2208a297db36f58594c7ecc25d3713f.json @@ -0,0 +1 @@ +{"file":"reference/table-operations/create/rollup.md","objects":{"source":{"type":"Table","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"int"}],"rows":[[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Charlie"},{"value":"30"}],[{"value":"Charlie"},{"value":"30"}]]}},"firstThenSum":{"type":"HierarchicalTable","data":{"columns":[{"name":"Key","type":"java.lang.String"},{"name":"Value","type":"long"}],"rows":[[{"value":""},{"value":"60"}],[{"value":"Alpha"},{"value":"10"}],[{"value":"Bravo"},{"value":"20"}],[{"value":"Charlie"},{"value":"30"}]],"rowDepths":[1,2,2,2]}}}} \ No newline at end of file diff --git a/docs/python/snapshots/74a7db0b5abe5682f828a576c69e2222.json b/docs/python/snapshots/74a7db0b5abe5682f828a576c69e2222.json deleted file mode 100644 index 33cb2460f88..00000000000 --- a/docs/python/snapshots/74a7db0b5abe5682f828a576c69e2222.json +++ /dev/null @@ -1 +0,0 @@ -{"file":"core/docs/how-to-guides/excel/excel-add-in.md","objects":{"static_table":{"type":"Table","data":{"columns":[{"name":"X","type":"int"}],"rows":[[{"value":"4"}],[{"value":"7"}],[{"value":"7"}],[{"value":"6"}],[{"value":"7"}],[{"value":"9"}],[{"value":"1"}],[{"value":"2"}],[{"value":"8"}],[{"value":"4"}]]}}}} \ No newline at end of file diff --git a/docs/python/snapshots/db3562b0dc23b199a91d0762dad1ac03.json b/docs/python/snapshots/db3562b0dc23b199a91d0762dad1ac03.json deleted file mode 100644 index f0726a5616d..00000000000 --- a/docs/python/snapshots/db3562b0dc23b199a91d0762dad1ac03.json +++ /dev/null @@ -1 +0,0 @@ -{"file":"core/docs/how-to-guides/excel/excel-client.md","objects":{"crypto_table":{"type":"Table","data":{"columns":[{"name":"Timestamp","type":"java.time.Instant"},{"name":"Exchange","type":"java.lang.String"},{"name":"Price","type":"double"},{"name":"Size","type":"double"}],"rows":[]}}}} \ No newline at end of file diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index f297cda94b6..6703718ace8 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -567,6 +567,11 @@ CloseableIterator objectColumnIterator(@NotNull String co *

* *

+ * A blink table can be converted to an append-only table with + * {@link io.deephaven.engine.table.impl.BlinkTableTools#blinkToAppendOnly(io.deephaven.engine.table.Table)}. + *

+ * + *

* Some aggregations (in particular {@link #groupBy} and {@link #partitionBy} cannot provide the desired blink table * aggregation semantics because doing so would require storing the entire stream of blink updates in memory. If * that behavior is desired, use {@code blinkToAppendOnly}. If on the other hand, you would like to group or diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java index fd9ee394974..02f5958f67a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationContext.java @@ -270,10 +270,12 @@ UnaryOperator[] initializeRefreshing(@NotNull final QueryTabl * * @param upstream The upstream {@link TableUpdateImpl} * @param startingDestinationsCount The number of used destinations at the beginning of this step + * @param modifiedOperators an array of booleans, parallel to operators, indicating which operators were modified */ - void resetOperatorsForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - for (final IterativeChunkedAggregationOperator operator : operators) { - operator.resetForStep(upstream, startingDestinationsCount); + void resetOperatorsForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount, + final boolean[] modifiedOperators) { + for (int ii = 0; ii < operators.length; ii++) { + modifiedOperators[ii] = operators[ii].resetForStep(upstream, startingDestinationsCount); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java index 28d9d96f8cd..9812aaf7ae6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java @@ -33,11 +33,8 @@ import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ChunkSource; -import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.*; -import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.by.rollup.NullColumns; import io.deephaven.engine.table.impl.by.rollup.RollupAggregation; import io.deephaven.engine.table.impl.by.rollup.RollupAggregationOutputs; @@ -93,10 +90,12 @@ import io.deephaven.engine.table.impl.by.ssmpercentile.SsmChunkedPercentileOperator; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.WhereFilter; +import io.deephaven.engine.table.impl.sources.IntegerSingleValueSource; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.ssms.SegmentedSortedMultiSet; import io.deephaven.engine.table.impl.util.freezeby.FreezeByCountOperator; import io.deephaven.engine.table.impl.util.freezeby.FreezeByOperator; +import io.deephaven.qst.type.IntType; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.annotations.FinalDefault; import io.deephaven.util.type.ArrayTypeUtils; @@ -201,12 +200,14 @@ public static AggregationContextFactory forRollupBase( * mutated by {@link AggregationProcessor}. * @param nullColumns Map of group-by column names and data types to aggregate with a null-column aggregation * @param rollupColumn the name of the rollup column in the result, used to traverse to the next lower level nodes + * @param source the original source table of the rollup (not the table we are reaggregating) * @return The {@link AggregationContextFactory} */ public static AggregationContextFactory forRollupReaggregated( @NotNull final Collection aggregations, @NotNull final Collection> nullColumns, - @NotNull final ColumnName rollupColumn) { + @NotNull final ColumnName rollupColumn, + @NotNull final Table source) { if (aggregations.stream().anyMatch(agg -> agg instanceof Partition)) { rollupUnsupported("Partition"); } @@ -214,7 +215,7 @@ public static AggregationContextFactory forRollupReaggregated( reaggregations.add(RollupAggregation.nullColumns(nullColumns)); reaggregations.addAll(aggregations); reaggregations.add(Partition.of(rollupColumn)); - return new AggregationProcessor(reaggregations, Type.ROLLUP_REAGGREGATED); + return new WithSource(reaggregations, Type.ROLLUP_REAGGREGATED, source); } /** @@ -263,6 +264,8 @@ public static AggregationContextFactory forSelectDistinct() { public static final ColumnName EXPOSED_GROUP_ROW_SETS = ColumnName.of("__EXPOSED_GROUP_ROW_SETS__"); + public static final ColumnName ROLLUP_FORMULA_DEPTH = ColumnName.of("__FORMULA_DEPTH__"); + /** * Create a trivial {@link AggregationContextFactory} to {@link Aggregation#AggGroup(String...) group} the input * table and expose the group {@link io.deephaven.engine.rowset.RowSet row sets} as {@link #EXPOSED_GROUP_ROW_SETS}. @@ -732,6 +735,154 @@ final void addCountWhereOperator(@NotNull CountWhere countWhere) { addOperator(new CountWhereOperator(countWhere.column().name(), whereFilters, recorders, filterRecorders), null, inputColumnNames); } + + /** + * @return the index of an existing group by operator, or -1 if no operator was found + */ + int existingGroupByOperatorIndex() { + for (int ii = 0; ii < operators.size(); ++ii) { + if (operators.get(ii) instanceof GroupByChunkedOperator) { + return ii; + } + } + return -1; + } + + /** + * @return the index of an existing group by reaggregation operator, or -1 if no operator was found + */ + int existingGroupByReaggregateIndex() { + for (int ii = 0; ii < operators.size(); ++ii) { + if (operators.get(ii) instanceof GroupByReaggregateOperator) { + return ii; + } + } + return -1; + } + + /** + * Ensures that the existing GroupByChunkedOperator has the required input/output columns + * + * @param createExtraPairs When true, create all of the pairs for the group by operator. When false, if there + * are any inputs that match the pairs we'll pass the operator through. The AggGroup aggregation can't + * just tack pairs onto an existing operator, because the order would be incorrect. Formulas in a rollup + * don't expose the results of the shared grouping, so just tacking them on is fine. + * @param hideExtras true if the extra pairs should be hidden from results, false otherwise + */ + GroupByChunkedOperator ensureGroupByOperator(final QueryTable table, + final int existingOperatorIndex, + final String exposeRowSetAs, + final MatchPair[] matchPairs, + final boolean createExtraPairs, + final boolean hideExtras) { + boolean recreate = false; + final GroupByChunkedOperator existing = (GroupByChunkedOperator) operators.get(existingOperatorIndex); + if (exposeRowSetAs != null) { + if (existing.getExposedRowSetsAs() == null) { + recreate = true; + } else { + if (!existing.getExposedRowSetsAs().equals(exposeRowSetAs)) { + throw new UnsupportedOperationException( + "AggGroupBy cannot have inconsistent exposed row redirections names: " + + existing.getExposedRowSetsAs() + " != " + exposeRowSetAs); + } + } + } + final List newPairs = new ArrayList<>(Arrays.asList(existing.getAggregatedColumnPairs())); + List existingHidden = existing.getHiddenResults(); + final List hiddenResults = new ArrayList<>(existingHidden == null ? List.of() : existingHidden); + for (MatchPair matchPair : matchPairs) { + final String input = matchPair.input().name(); + if (Arrays.stream(existing.getAggregatedColumnPairs()).noneMatch(p -> p.input().name().equals(input))) { + // we didn't have this in the input at all + newPairs.add(matchPair); + hiddenResults.add(matchPair.output().name()); + recreate = true; + } else if (createExtraPairs + && Arrays.stream(existing.getAggregatedColumnPairs()).noneMatch(p -> p.equals(matchPair))) { + newPairs.add(matchPair); + if (hideExtras) { + hiddenResults.add(matchPair.output().name()); + } + recreate = true; + } + } + if (!recreate) { + // we're totally satisfied with the existing operator for use with a secondary operator that pulls an + // output from it to the desired name + return existing; + } + + final String newExposeRowsetName = exposeRowSetAs == null ? existing.getExposedRowSetsAs() : exposeRowSetAs; + final MatchPair[] newMatchPairArray = newPairs.toArray(MatchPair[]::new); + final GroupByChunkedOperator newOperator = + new GroupByChunkedOperator(table, true, newExposeRowsetName, hiddenResults, newMatchPairArray); + + // any formula operators that used the old group by operator must be updated + for (IterativeChunkedAggregationOperator operator : operators) { + if (operator instanceof FormulaMultiColumnChunkedOperator) { + ((FormulaMultiColumnChunkedOperator) operator).updateGroupBy(newOperator, false); + } else if (operator instanceof FormulaChunkedOperator) { + ((FormulaChunkedOperator) operator).updateGroupBy(newOperator, false); + } + } + + operators.set(existingOperatorIndex, newOperator); + return newOperator; + } + + GroupByReaggregateOperator ensureGroupByReaggregateOperator(final QueryTable table, + final int existingOperatorIndex, + final String exposeRowSetAs, + final MatchPair[] matchPairs) { + boolean recreate = false; + final GroupByReaggregateOperator existing = + (GroupByReaggregateOperator) operators.get(existingOperatorIndex); + if (exposeRowSetAs != null) { + if (existing.getExposedRowSetsAs() == null) { + recreate = true; + } else { + if (!existing.getExposedRowSetsAs().equals(exposeRowSetAs)) { + throw new UnsupportedOperationException( + "AggGroupBy cannot have inconsistent exposed row redirections names: " + + existing.getExposedRowSetsAs() + " != " + exposeRowSetAs); + } + } + } + final List newPairs = new ArrayList<>(Arrays.asList(existing.getAggregatedColumnPairs())); + List existingHidden = existing.getHiddenResults(); + final List hiddenResults = new ArrayList<>(existingHidden == null ? List.of() : existingHidden); + for (MatchPair matchPair : matchPairs) { + final String input = matchPair.input().name(); + if (Arrays.stream(existing.getAggregatedColumnPairs()).noneMatch(p -> p.input().name().equals(input))) { + newPairs.add(matchPair); + hiddenResults.add(matchPair.output().name()); + recreate = true; + } + } + if (!recreate) { + // we're totally satisfied with the existing operator for use with a secondary operator that pulls an + // output from it to the desired name + return existing; + } + + final String newExposeRowsetName = exposeRowSetAs == null ? existing.getExposedRowSetsAs() : exposeRowSetAs; + final MatchPair[] newMatchPairArray = newPairs.toArray(MatchPair[]::new); + final GroupByReaggregateOperator newOperator = + new GroupByReaggregateOperator(table, true, newExposeRowsetName, hiddenResults, newMatchPairArray); + + // any formula operators that used the old group by operator must be updated + for (IterativeChunkedAggregationOperator operator : operators) { + // Only FormulaMultiColumn operators need to be adjusted, a FormulaChunkedOperator cannot participate + // in a rollup. + if (operator instanceof FormulaMultiColumnChunkedOperator) { + ((FormulaMultiColumnChunkedOperator) operator).updateGroupBy(newOperator, false); + } + } + + operators.set(existingOperatorIndex, newOperator); + return newOperator; + } } // ----------------------------------------------------------------------------------------------------------------- @@ -798,21 +949,13 @@ public void visit(@NotNull final Partition partition) { @Override public void visit(@NotNull final Formula formula) { + validateFormulaIsNotReaggregating(formula); final SelectColumn selectColumn = SelectColumn.of(formula.selectable()); // Get or create a column definition map composed of vectors of the original column types (or scalars when // part of the key columns). final Set groupByColumnSet = Set.of(groupByColumnNames); - if (vectorColumnDefinitions == null) { - vectorColumnDefinitions = table.getDefinition().getColumnStream().collect(Collectors.toMap( - ColumnDefinition::getName, - (final ColumnDefinition cd) -> groupByColumnSet.contains(cd.getName()) - ? cd - : ColumnDefinition.fromGenericType( - cd.getName(), - VectorFactory.forElementType(cd.getDataType()).vectorType(), - cd.getDataType()))); - } + maybeInitializeVectorColumns(groupByColumnSet, table.getDefinition(), Map.of()); // Get the input column names from the formula and provide them to the groupBy operator final String[] allInputColumns = @@ -823,20 +966,21 @@ public void visit(@NotNull final Formula formula) { final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new); final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new); - if (!selectColumn.getColumnArrays().isEmpty()) { - throw new IllegalArgumentException("AggFormula does not support column arrays (" - + selectColumn.getColumnArrays() + ")"); - } - if (selectColumn.hasVirtualRowVariables()) { - throw new IllegalArgumentException("AggFormula does not support virtual row variables"); + validateSelectColumnForFormula(selectColumn); + final GroupByChunkedOperator groupByChunkedOperator; + final int existingGroupByOperatorIndex = existingGroupByOperatorIndex(); + if (existingGroupByOperatorIndex >= 0) { + // if we have an existing group by operator, then use it (or update it to reflect our input columns) + groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null, + makeSymmetricMatchPairs(inputNonKeyColumns), false, false); + } else { + groupByChunkedOperator = + makeGroupByOperatorForFormula(makeSymmetricMatchPairs(inputNonKeyColumns), table, null); } - // TODO: re-use shared groupBy operators (https://github.com/deephaven/deephaven-core/issues/6363) - final GroupByChunkedOperator groupByChunkedOperator = new GroupByChunkedOperator(table, false, null, - Arrays.stream(inputNonKeyColumns).map(col -> MatchPair.of(Pair.parse(col))) - .toArray(MatchPair[]::new)); final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table, - groupByChunkedOperator, true, selectColumn, inputKeyColumns); + groupByChunkedOperator, existingGroupByOperatorIndex < 0, selectColumn, inputKeyColumns, null, + null); addNoInputOperator(op); } @@ -878,8 +1022,9 @@ public void visit(@NotNull final AggSpecFirst first) { @Override public void visit(@NotNull final AggSpecFormula formula) { unsupportedForBlinkTables("Formula"); - // TODO: re-use shared groupBy operators (https://github.com/deephaven/deephaven-core/issues/6363) - final GroupByChunkedOperator groupByChunkedOperator = new GroupByChunkedOperator(table, false, null, + // Note: we do not attempt to reuse the groupBy operator for the deprecated "each" formula, we only reuse + // them for the new-style multi-column formula operators + final GroupByChunkedOperator groupByChunkedOperator = new GroupByChunkedOperator(table, false, null, null, resultPairs.stream().map(pair -> MatchPair.of((Pair) pair.input())).toArray(MatchPair[]::new)); final FormulaChunkedOperator formulaChunkedOperator = new FormulaChunkedOperator(groupByChunkedOperator, true, formula.formula(), formula.paramToken(), compilationProcessor, @@ -895,7 +1040,18 @@ public void visit(AggSpecFreeze freeze) { @Override public void visit(@NotNull final AggSpecGroup group) { unsupportedForBlinkTables("Group"); - addNoInputOperator(new GroupByChunkedOperator(table, true, null, MatchPair.fromPairs(resultPairs))); + + final int existingOperator = existingGroupByOperatorIndex(); + if (existingOperator >= 0) { + // Reuse the operator, adding a result extractor for the new result pairs + GroupByChunkedOperator existing = + ensureGroupByOperator(table, existingOperator, null, MatchPair.fromPairs(resultPairs), false, + false); + addNoInputOperator(existing.resultExtractor(resultPairs)); + } else { + addNoInputOperator( + new GroupByChunkedOperator(table, true, null, null, MatchPair.fromPairs(resultPairs))); + } } @Override @@ -972,6 +1128,72 @@ public void visit(@NotNull final AggSpecVar var) { } } + private static void validateSelectColumnForFormula(SelectColumn selectColumn) { + if (!selectColumn.getColumnArrays().isEmpty()) { + throw new IllegalArgumentException("AggFormula does not support column arrays (" + + selectColumn.getColumnArrays() + ")"); + } + if (selectColumn.hasVirtualRowVariables()) { + throw new IllegalArgumentException("AggFormula does not support virtual row variables"); + } + } + + private static void validateFormulaIsNotReaggregating(Formula formula) { + if (formula.reaggregateAggregatedValues()) { + throw new IllegalArgumentException("AggFormula does not support reaggregating except in a rollup."); + } + } + + private void maybeInitializeVectorColumns(Set groupByColumnSet, final TableDefinition definition, + Map> extraColumns) { + if (vectorColumnDefinitions != null) { + return; + } + vectorColumnDefinitions = new LinkedHashMap<>(); + definition.getColumnStream().forEach(cd -> { + ColumnDefinition resultDefinition; + if (groupByColumnSet.contains(cd.getName())) { + resultDefinition = cd; + } else { + resultDefinition = ColumnDefinition.fromGenericType( + cd.getName(), + VectorFactory.forElementType(cd.getDataType()).vectorType(), + cd.getDataType()); + } + vectorColumnDefinitions.put(cd.getName(), resultDefinition); + }); + vectorColumnDefinitions.putAll(extraColumns); + } + + + private @NotNull GroupByChunkedOperator makeGroupByOperatorForFormula(final MatchPair[] pairs, + final QueryTable table, final String exposedRowsets) { + final boolean register = exposedRowsets != null; + return new GroupByChunkedOperator(table, register, exposedRowsets, null, pairs); + } + + /** + * Convert the array of column names to MatchPairs of the form {@code Col_GRP__ROLLUP__} + * + * @param cols the columns to convert + * @return the mangled name matchpairs + */ + private static MatchPair @NotNull [] makeMangledMatchPairs(String[] cols) { + return Arrays + .stream(cols).map(col -> new MatchPair(col + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX, col)) + .toArray(MatchPair[]::new); + } + + /** + * Convert the array of strings to MatchPairs of the form Col=Col + * + * @param columns the columns to convert to MatchPairs + * @return an array of MatchPairs + */ + private static MatchPair @NotNull [] makeSymmetricMatchPairs(String[] columns) { + return Arrays.stream(columns).map(col -> new MatchPair(col, col)).toArray(MatchPair[]::new); + } + // ----------------------------------------------------------------------------------------------------------------- // Rollup Unsupported Operations // ----------------------------------------------------------------------------------------------------------------- @@ -994,12 +1216,6 @@ default void visit(@NotNull final LastRowKey lastRowKey) { rollupUnsupported("LastRowKey"); } - @Override - @FinalDefault - default void visit(@NotNull final Formula formula) { - rollupUnsupported("Formula"); - } - // ------------------------------------------------------------------------------------------------------------- // AggSpec.Visitor for unsupported column aggregation specs // ------------------------------------------------------------------------------------------------------------- @@ -1015,12 +1231,6 @@ default void visit(AggSpecFreeze freeze) { rollupUnsupported("Freeze"); } - @Override - @FinalDefault - default void visit(@NotNull final AggSpecGroup group) { - rollupUnsupported("Group"); - } - @Override @FinalDefault default void visit(@NotNull final AggSpecFormula formula) { @@ -1066,6 +1276,7 @@ private static void rollupUnsupported(@NotNull final String operationName, final */ private final class RollupBaseConverter extends Converter implements RollupAggregation.Visitor, UnsupportedRollupAggregations { + private final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor; private int nextColumnIdentifier = 0; @@ -1074,6 +1285,14 @@ private RollupBaseConverter( final boolean requireStateChangeRecorder, @NotNull final String... groupByColumnNames) { super(table, requireStateChangeRecorder, groupByColumnNames); + this.compilationProcessor = QueryCompilerRequestProcessor.batch(); + } + + @Override + AggregationContext build() { + final AggregationContext resultContext = super.build(); + compilationProcessor.compile(); + return resultContext; } // ------------------------------------------------------------------------------------------------------------- @@ -1108,6 +1327,90 @@ public void visit(@NotNull final Partition partition) { addNoInputOperator(partitionOperator); } + @Override + public void visit(AggSpecGroup group) { + unsupportedForBlinkTables("Group for rollup"); + + final int indexOfExistingOperator = existingGroupByOperatorIndex(); + if (indexOfExistingOperator >= 0) { + // share the existing operator for groupBy in a rollup base + final GroupByChunkedOperator existing = ensureGroupByOperator(table, indexOfExistingOperator, + EXPOSED_GROUP_ROW_SETS.name(), MatchPair.fromPairs(resultPairs), false, false); + addNoInputOperator(existing.resultExtractor(resultPairs)); + } else { + addNoInputOperator(new GroupByChunkedOperator(table, true, EXPOSED_GROUP_ROW_SETS.name(), + null, + MatchPair.fromPairs(resultPairs))); + } + } + + @Override + public void visit(Formula formula) { + unsupportedForBlinkTables("Formula for rollup"); + + final SelectColumn selectColumn = SelectColumn.of(formula.selectable()); + + // Get or create a column definition map composed of vectors of the original column types (or scalars when + // part of the key columns). + final Set groupByColumnSet = Set.of(groupByColumnNames); + // For the base of a rollup, we use the original table definition, but tack on a rollup depth column + maybeInitializeVectorColumns(groupByColumnSet, table.getDefinition(), Map.of(ROLLUP_FORMULA_DEPTH.name(), + ColumnDefinition.of(ROLLUP_FORMULA_DEPTH.name(), IntType.of()))); + + // Get the input column names from the formula and provide them to the groupBy operator + final String[] allInputColumns = + selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new); + + final Map> partitioned = Arrays.stream(allInputColumns) + .collect(Collectors.partitioningBy( + o -> groupByColumnSet.contains(o) || ROLLUP_FORMULA_DEPTH.name().equals(o))); + final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new); + final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new); + + validateSelectColumnForFormula(selectColumn); + + final GroupByChunkedOperator groupByChunkedOperator; + final boolean delegate; + + final int existingGroupByOperatorIndex = existingGroupByOperatorIndex(); + final MatchPair[] mangledMatchPairs = makeMangledMatchPairs(inputNonKeyColumns); + + if (formula.reaggregateAggregatedValues()) { + if (existingGroupByOperatorIndex >= 0) { + groupByChunkedOperator = + ensureGroupByOperator(table, existingGroupByOperatorIndex, null, mangledMatchPairs, true, + true); + delegate = false; + } else { + // When we are reaggregating, we do not expose the rowsets, because the next level creates a + // completely fresh operator + groupByChunkedOperator = makeGroupByOperatorForFormula(mangledMatchPairs, table, null); + // the operator is not added, so there is delegation + delegate = true; + } + } else { + if (existingGroupByOperatorIndex >= 0) { + groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, + EXPOSED_GROUP_ROW_SETS.name(), mangledMatchPairs, true, false); + delegate = false; + } else { + // When we do not reaggregate, the next level needs access to our exposed group row sets + groupByChunkedOperator = + makeGroupByOperatorForFormula(mangledMatchPairs, table, EXPOSED_GROUP_ROW_SETS.name()); + addNoInputOperator(groupByChunkedOperator); + // we added the operator, so we cannot delegate + delegate = false; + } + } + + final IntegerSingleValueSource depthSource = new IntegerSingleValueSource(); + depthSource.set(groupByColumnNames.length); + + final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table, + groupByChunkedOperator, delegate, selectColumn, inputKeyColumns, null, depthSource); + addNoInputOperator(op); + } + // ------------------------------------------------------------------------------------------------------------- // AggSpec.Visitor // ------------------------------------------------------------------------------------------------------------- @@ -1215,6 +1518,7 @@ IterativeChunkedAggregationOperator apply( private final class RollupReaggregatedConverter extends Converter implements RollupAggregation.Visitor, UnsupportedRollupAggregations { + private final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor; private int nextColumnIdentifier = 0; private RollupReaggregatedConverter( @@ -1222,6 +1526,14 @@ private RollupReaggregatedConverter( final boolean requireStateChangeRecorder, @NotNull final String... groupByColumnNames) { super(table, requireStateChangeRecorder, groupByColumnNames); + this.compilationProcessor = QueryCompilerRequestProcessor.batch(); + } + + @Override + AggregationContext build() { + final AggregationContext resultContext = super.build(); + compilationProcessor.compile(); + return resultContext; } // ------------------------------------------------------------------------------------------------------------- @@ -1265,6 +1577,111 @@ public void visit(@NotNull final Partition partition) { addNoInputOperator(partitionOperator); } + @Override + public void visit(AggSpecGroup group) { + final ColumnSource groupRowSet = table.getColumnSource(EXPOSED_GROUP_ROW_SETS.name()); + final MatchPair[] pairs = new MatchPair[resultPairs.size()]; + for (int ii = 0; ii < resultPairs.size(); ++ii) { + pairs[ii] = new MatchPair(resultPairs.get(ii).output().name(), resultPairs.get(ii).output().name()); + } + final int existingGroupByOperatorIndex = existingGroupByReaggregateIndex(); + if (existingGroupByOperatorIndex >= 0) { + final GroupByReaggregateOperator existing = ensureGroupByReaggregateOperator(table, + existingGroupByOperatorIndex, EXPOSED_GROUP_ROW_SETS.name(), pairs); + addNoInputOperator(existing.resultExtractor(resultPairs)); + } else { + addOperator(new GroupByReaggregateOperator(table, true, EXPOSED_GROUP_ROW_SETS.name(), null, pairs), + groupRowSet, + EXPOSED_GROUP_ROW_SETS.name()); + } + } + + @Override + public void visit(Formula formula) { + final SelectColumn selectColumn = SelectColumn.of(formula.selectable()); + + // Get or create a column definition map composed of vectors of the original column types (or scalars when + // part of the key columns). + final Set groupByColumnSet = Set.of(groupByColumnNames); + + // for a reaggregated formula, we can't use the input definition as is; we want to use the definition from + // the source table; but tack on our rollup depth column + AggregationProcessor thisProcessor = AggregationProcessor.this; + final TableDefinition sourceDefinition = ((WithSource) thisProcessor).source.getDefinition(); + maybeInitializeVectorColumns(groupByColumnSet, sourceDefinition, Map.of(ROLLUP_FORMULA_DEPTH.name(), + ColumnDefinition.of(ROLLUP_FORMULA_DEPTH.name(), IntType.of()))); + + // Get the input column names from the formula and provide them to the groupBy operator + final String[] allInputColumns = + selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new); + + final Map> partitioned = Arrays.stream(allInputColumns) + .collect(Collectors.partitioningBy( + o -> groupByColumnSet.contains(o) || ROLLUP_FORMULA_DEPTH.name().equals(o))); + final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new); + final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new); + + validateSelectColumnForFormula(selectColumn); + + final Map renames = new HashMap<>(); + final MatchPair[] groupPairs = new MatchPair[inputNonKeyColumns.length]; + + for (int ii = 0; ii < inputNonKeyColumns.length; ++ii) { + final String mangledColumn = inputNonKeyColumns[ii] + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX; + if (table.hasColumns(mangledColumn)) { + groupPairs[ii] = new MatchPair(mangledColumn, mangledColumn); + renames.put(mangledColumn, inputNonKeyColumns[ii]); + } else { + // reagg uses the output name + groupPairs[ii] = new MatchPair(mangledColumn, inputNonKeyColumns[ii]); + // we are not changing the input column name, so don't need the rename + renames.put(inputNonKeyColumns[ii], inputNonKeyColumns[ii]); + } + } + + final IntegerSingleValueSource depthSource = new IntegerSingleValueSource(); + depthSource.set(groupByColumnNames.length); + + if (formula.reaggregateAggregatedValues()) { + GroupByChunkedOperator groupByOperator; + + final int existingIndex = existingGroupByOperatorIndex(); + if (existingIndex >= 0) { + groupByOperator = ensureGroupByOperator(table, existingIndex, null, groupPairs, true, true); + } else { + final List hiddenPairs = + Arrays.stream(groupPairs).map(mp -> mp.left().name()).collect(Collectors.toList()); + groupByOperator = new GroupByChunkedOperator(table, false, null, hiddenPairs, groupPairs); + } + + // everything gets hidden + final FormulaMultiColumnChunkedOperator op = + new FormulaMultiColumnChunkedOperator(table, groupByOperator, + true, selectColumn, inputKeyColumns, renames, depthSource); + + addOperator(op, null, inputNonKeyColumns); + } else { + final ColumnSource groupRowSet = table.getColumnSource(EXPOSED_GROUP_ROW_SETS.name()); + GroupByReaggregateOperator groupByOperator; + + final int existingIndex = existingGroupByReaggregateIndex(); + if (existingIndex >= 0) { + groupByOperator = ensureGroupByReaggregateOperator(table, existingIndex, + EXPOSED_GROUP_ROW_SETS.name(), groupPairs); + } else { + groupByOperator = + new GroupByReaggregateOperator(table, true, EXPOSED_GROUP_ROW_SETS.name(), null, + groupPairs); + addOperator(groupByOperator, groupRowSet, EXPOSED_GROUP_ROW_SETS.name()); + } + + final FormulaMultiColumnChunkedOperator op = + new FormulaMultiColumnChunkedOperator(table, groupByOperator, + false, selectColumn, inputKeyColumns, renames, depthSource); + addOperator(op, groupRowSet, EXPOSED_GROUP_ROW_SETS.name()); + } + } + // ------------------------------------------------------------------------------------------------------------- // AggSpec.Visitor // ------------------------------------------------------------------------------------------------------------- @@ -1583,7 +2000,7 @@ private static AggregationContext makeExposedGroupRowSetAggregationContext( // noinspection unchecked return new AggregationContext( new IterativeChunkedAggregationOperator[] { - new GroupByChunkedOperator(inputQueryTable, true, EXPOSED_GROUP_ROW_SETS.name()), + new GroupByChunkedOperator(inputQueryTable, true, EXPOSED_GROUP_ROW_SETS.name(), null), new CountAggregationOperator(null) }, new String[][] {ArrayTypeUtils.EMPTY_STRING_ARRAY, ArrayTypeUtils.EMPTY_STRING_ARRAY}, @@ -1593,7 +2010,7 @@ private static AggregationContext makeExposedGroupRowSetAggregationContext( // noinspection unchecked return new AggregationContext( new IterativeChunkedAggregationOperator[] { - new GroupByChunkedOperator(inputQueryTable, true, EXPOSED_GROUP_ROW_SETS.name()) + new GroupByChunkedOperator(inputQueryTable, true, EXPOSED_GROUP_ROW_SETS.name(), null) }, new String[][] {ArrayTypeUtils.EMPTY_STRING_ARRAY}, new ChunkSource.WithPrev[] {null}, @@ -2159,4 +2576,14 @@ public static AggregationRowLookup getRowLookup(@NotNull final Table aggregation Assert.neqNull(value, "aggregation result row lookup"); return (AggregationRowLookup) value; } + + private static class WithSource extends AggregationProcessor { + private final @NotNull Table source; + + private WithSource(@NotNull Collection aggregations, @NotNull Type type, + @NotNull Table source) { + super(aggregations, type); + this.source = source; + } + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseBlinkFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseBlinkFirstOrLastChunkedOperator.java index 9b5cee1f077..0e505e3c901 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseBlinkFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseBlinkFirstOrLastChunkedOperator.java @@ -98,11 +98,12 @@ public final boolean requiresRowKeys() { @Override @OverridingMethodsMustInvokeSuper - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { if ((redirections = cachedRedirections.get()) == null) { cachedRedirections = new SoftReference<>(redirections = new LongArraySource()); ensureCapacity(startingDestinationsCount); } + return false; } // ----------------------------------------------------------------------------------------------------------------- diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteBlinkSortedFirstOrLastChunkedOperator.java index d4520da86f6..1bdd13a7bde 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ByteBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharBlinkSortedFirstOrLastChunkedOperator.java index 092f753192a..13268ffa445 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/CharBlinkSortedFirstOrLastChunkedOperator.java @@ -53,11 +53,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java index 234b3be7ef9..b876e1347ea 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java @@ -537,7 +537,7 @@ private TableUpdate computeDownstreamIndicesAndCopyKeys( @NotNull final ModifiedColumnSet resultModifiedColumnSet, @NotNull final UnaryOperator[] resultModifiedColumnSetFactories) { final int firstStateToAdd = outputPosition.get(); - ac.resetOperatorsForStep(upstream, firstStateToAdd); + ac.resetOperatorsForStep(upstream, firstStateToAdd, modifiedOperators); if (upstream.removed().isNonempty()) { doRemoves(upstream.removed()); @@ -2085,7 +2085,9 @@ public void onUpdate(@NotNull final TableUpdate upstream) { } private void processNoKeyUpdate(@NotNull final TableUpdate upstream) { - ac.resetOperatorsForStep(upstream, 1); + final boolean[] modifiedOperators = new boolean[ac.size()]; + + ac.resetOperatorsForStep(upstream, 1, modifiedOperators); final ModifiedColumnSet upstreamModifiedColumnSet = upstream.modified().isEmpty() ? ModifiedColumnSet.EMPTY @@ -2099,7 +2101,6 @@ private void processNoKeyUpdate(@NotNull final TableUpdate upstream) { ac.initializeSingletonContexts(opContexts, upstream, od.operatorsWithModifiedInputColumns); - final boolean[] modifiedOperators = new boolean[ac.size()]; // remove all the removals if (upstream.removed().isNonempty()) { doNoKeyRemoval(upstream.removed(), ac, opContexts, allColumns, modifiedOperators); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleBlinkSortedFirstOrLastChunkedOperator.java index 14822045a1b..bff986e9440 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/DoubleBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatBlinkSortedFirstOrLastChunkedOperator.java index e5a8bc53e42..7bb38240ca2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FloatBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java index 176a14a28f6..fe9bb782d03 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaChunkedOperator.java @@ -39,8 +39,8 @@ */ class FormulaChunkedOperator implements IterativeChunkedAggregationOperator { - private final GroupByChunkedOperator groupBy; - private final boolean delegateToBy; + private GroupByChunkedOperator groupBy; + private boolean delegateToBy; private final String[] inputColumnNames; private final String[] resultColumnNames; @@ -301,12 +301,13 @@ public UnaryOperator initializeRefreshing(@NotNull final Quer } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { if (delegateToBy) { groupBy.resetForStep(upstream, startingDestinationsCount); } updateUpstreamModifiedColumnSet = upstream.modified().isEmpty() ? ModifiedColumnSet.EMPTY : upstream.modifiedColumnSet(); + return false; } @Override @@ -494,4 +495,9 @@ private boolean[] makeObjectOrModifiedColumnsMask(@NotNull final ModifiedColumnS } return columnsMask; } + + public void updateGroupBy(GroupByChunkedOperator groupBy, boolean delegateToBy) { + this.groupBy = groupBy; + this.delegateToBy = delegateToBy; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java index 2bf75160fdf..7cdffaba562 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java @@ -3,6 +3,7 @@ // package io.deephaven.engine.table.impl.by; +import io.deephaven.base.verify.Assert; import io.deephaven.chunk.*; import io.deephaven.chunk.attributes.ChunkLengths; import io.deephaven.chunk.attributes.ChunkPositions; @@ -19,11 +20,14 @@ import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE; @@ -34,11 +38,15 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp private final QueryTable inputTable; - private final GroupByChunkedOperator groupBy; - private final boolean delegateToBy; + private GroupByOperator groupBy; + private boolean delegateToBy; private final SelectColumn selectColumn; private final WritableColumnSource resultColumn; private final String[] inputKeyColumns; + @Nullable + private final ColumnSource formulaDepthSource; + @Nullable + private final Map renames; private ChunkSource formulaDataSource; @@ -60,18 +68,24 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp * be false if {@code groupBy} is updated by the helper, or if this is not the first operator sharing * {@code groupBy}. * @param selectColumn The formula column that will produce the results + * @param renames a map from input names in the groupBy operator (i.e. mangled names) to input column names in the + * formula */ FormulaMultiColumnChunkedOperator( @NotNull final QueryTable inputTable, - @NotNull final GroupByChunkedOperator groupBy, + @NotNull final GroupByOperator groupBy, final boolean delegateToBy, @NotNull final SelectColumn selectColumn, - @NotNull final String[] inputKeyColumns) { + @NotNull final String[] inputKeyColumns, + @Nullable Map renames, + @Nullable final ColumnSource formulaDepthSource) { this.inputTable = inputTable; this.groupBy = groupBy; this.delegateToBy = delegateToBy; this.selectColumn = selectColumn; this.inputKeyColumns = inputKeyColumns; + this.renames = renames; + this.formulaDepthSource = formulaDepthSource; resultColumn = ArrayBackedColumnSource.getMemoryColumnSource( 0, selectColumn.getReturnedType(), selectColumn.getReturnedComponentType()); @@ -199,7 +213,7 @@ public boolean modifyRowKeys(final SingletonContext context, @Override public boolean requiresRowKeys() { - return delegateToBy; + return delegateToBy && groupBy.requiresRowKeys(); } @Override @@ -222,14 +236,31 @@ public void propagateInitialState(@NotNull final QueryTable resultTable, int sta } final Map> sourceColumns; - if (inputKeyColumns.length == 0) { + if (inputKeyColumns.length == 0 && formulaDepthSource == null && renames == null) { // noinspection unchecked sourceColumns = (Map>) groupBy.getInputResultColumns(); } else { final Map> columnSourceMap = resultTable.getColumnSourceMap(); - sourceColumns = new HashMap<>(groupBy.getInputResultColumns()); + sourceColumns = new HashMap<>(groupBy.getInputResultColumns().size() + 1); + for (Map.Entry> entry : groupBy.getInputResultColumns().entrySet()) { + final String columnName = entry.getKey(); + final String renamed; + if (renames != null && (renamed = renames.get(columnName)) != null) { + sourceColumns.put(renamed, entry.getValue()); + } else { + sourceColumns.put(columnName, entry.getValue()); + } + } Arrays.stream(inputKeyColumns).forEach(col -> sourceColumns.put(col, columnSourceMap.get(col))); + sourceColumns.put(AggregationProcessor.ROLLUP_FORMULA_DEPTH.name(), formulaDepthSource); } + final List missingColumns = selectColumn.getColumns().stream() + .filter(column -> !sourceColumns.containsKey(column)).collect(Collectors.toList()); + if (!missingColumns.isEmpty()) { + throw new IllegalStateException( + "Columns " + missingColumns + " not found, available columns are: " + sourceColumns.keySet()); + } + selectColumn.initInputs(resultTable.getRowSet(), sourceColumns); formulaDataSource = selectColumn.getDataView(); @@ -263,8 +294,7 @@ public UnaryOperator initializeRefreshing(@NotNull final Quer final String[] inputColumnNames = selectColumn.getColumns().toArray(String[]::new); final ModifiedColumnSet inputMCS = inputTable.newModifiedColumnSet(inputColumnNames); return inputToResultModifiedColumnSetFactory = input -> { - if (groupBy.getSomeKeyHasAddsOrRemoves() || - (groupBy.getSomeKeyHasModifies() && input.containsAny(inputMCS))) { + if (groupBy.hasModifications(input.containsAny(inputMCS))) { return resultMCS; } return ModifiedColumnSet.EMPTY; @@ -272,12 +302,13 @@ public UnaryOperator initializeRefreshing(@NotNull final Quer } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { if (delegateToBy) { groupBy.resetForStep(upstream, startingDestinationsCount); } updateUpstreamModifiedColumnSet = upstream.modified().isEmpty() ? ModifiedColumnSet.EMPTY : upstream.modifiedColumnSet(); + return false; } @Override @@ -408,4 +439,9 @@ public void close() { private static long calculateContainingBlockLastKey(final long firstKey) { return (firstKey / BLOCK_SIZE) * BLOCK_SIZE + BLOCK_SIZE - 1; } + + public void updateGroupBy(GroupByOperator groupBy, boolean delegateToBy) { + this.groupBy = groupBy; + this.delegateToBy = delegateToBy; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java index f5d28a99731..a15610c4fe6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java @@ -3,6 +3,7 @@ // package io.deephaven.engine.table.impl.by; +import io.deephaven.api.Pair; import io.deephaven.base.verify.Assert; import io.deephaven.chunk.attributes.ChunkLengths; import io.deephaven.chunk.attributes.ChunkPositions; @@ -22,9 +23,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; import java.util.function.UnaryOperator; import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE; @@ -33,7 +32,7 @@ * An {@link IterativeChunkedAggregationOperator} used in the implementation of {@link Table#groupBy}, * {@link io.deephaven.api.agg.spec.AggSpecGroup}, and {@link io.deephaven.api.agg.Aggregation#AggGroup(String...)}. */ -public final class GroupByChunkedOperator implements IterativeChunkedAggregationOperator { +public final class GroupByChunkedOperator implements GroupByOperator { private final QueryTable inputTable; private final boolean registeredWithHelper; @@ -44,8 +43,8 @@ public final class GroupByChunkedOperator implements IterativeChunkedAggregation private final ObjectArraySource addedBuilders; private final ObjectArraySource removedBuilders; - private final String[] inputColumnNames; private final Map> inputAggregatedColumns; + private final String[] inputColumnNamesForResults; private final Map> resultAggregatedColumns; private final ModifiedColumnSet aggregationInputsModifiedColumnSet; @@ -56,14 +55,33 @@ public final class GroupByChunkedOperator implements IterativeChunkedAggregation private boolean someKeyHasModifies; private boolean initialized; + private MatchPair[] aggregatedColumnPairs; + private List hiddenResults; + + /** + * Create a GroupedByChunkedOperator, which produces a column of {@link io.deephaven.vector.Vector vectors} for each + * of the input columns. + * + * @param inputTable the table we are aggregating + * @param registeredWithHelper true if we are registered with the helper (meaning we independently produce result + * columns), false otherwise. For a normal AggGroup this is true; for a group-by that is only part of an + * AggFormula this is false. + * @param exposeRowSetsAs the name of the column to expose the rowsets for each group as + * @param hiddenResults a list (possibly empty) of columns that are not exposed to the helper; or null if all + * columns should be exposed + * @param aggregatedColumnPairs the list of input and output columns for this operation + */ public GroupByChunkedOperator( @NotNull final QueryTable inputTable, final boolean registeredWithHelper, @Nullable final String exposeRowSetsAs, + @Nullable final List hiddenResults, @NotNull final MatchPair... aggregatedColumnPairs) { this.inputTable = inputTable; this.registeredWithHelper = registeredWithHelper; this.exposeRowSetsAs = exposeRowSetsAs; + this.hiddenResults = hiddenResults; + this.aggregatedColumnPairs = aggregatedColumnPairs; live = inputTable.isRefreshing(); rowSets = new ObjectArraySource<>(WritableRowSet.class); @@ -71,19 +89,24 @@ public GroupByChunkedOperator( inputAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length); resultAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length); + final List inputResultNameList = new ArrayList<>(aggregatedColumnPairs.length); Arrays.stream(aggregatedColumnPairs).forEach(pair -> { final AggregateColumnSource aggregateColumnSource = AggregateColumnSource.make(inputTable.getColumnSource(pair.rightColumn()), rowSets); inputAggregatedColumns.put(pair.rightColumn(), aggregateColumnSource); - resultAggregatedColumns.put(pair.leftColumn(), aggregateColumnSource); + if (hiddenResults == null || !hiddenResults.contains(pair.output().name())) { + resultAggregatedColumns.put(pair.output().name(), aggregateColumnSource); + inputResultNameList.add(pair.input().name()); + } }); + inputColumnNamesForResults = inputResultNameList.toArray(String[]::new); if (exposeRowSetsAs != null && resultAggregatedColumns.containsKey(exposeRowSetsAs)) { throw new IllegalArgumentException(String.format( "Exposing group RowSets as %s, but this conflicts with a requested grouped output column name", exposeRowSetsAs)); } - inputColumnNames = MatchPair.getRightColumns(aggregatedColumnPairs); + final String[] inputColumnNames = MatchPair.getRightColumns(aggregatedColumnPairs); if (live) { aggregationInputsModifiedColumnSet = inputTable.newModifiedColumnSet(inputColumnNames); removedBuilders = new ObjectArraySource<>(Object.class); @@ -392,9 +415,7 @@ public void ensureCapacity(final long tableSize) { return resultAggregatedColumns; } - /** - * Get a map from input column names to the corresponding output {@link ColumnSource}. - */ + @Override public Map> getInputResultColumns() { return inputAggregatedColumns; } @@ -416,6 +437,7 @@ public UnaryOperator initializeRefreshing( initializeNewRowSetPreviousValues(resultTable.getRowSet()); return registeredWithHelper ? new InputToResultModifiedColumnSetFactory(resultTable, + inputColumnNamesForResults, resultAggregatedColumns.keySet().toArray(String[]::new)) : null; } @@ -430,7 +452,7 @@ public UnaryOperator initializeRefreshing( UnaryOperator makeInputToResultModifiedColumnSetFactory( @NotNull final QueryTable resultTable, @NotNull final String[] resultColumnNames) { - return new InputToResultModifiedColumnSetFactory(resultTable, resultColumnNames); + return new InputToResultModifiedColumnSetFactory(resultTable, inputColumnNamesForResults, resultColumnNames); } private class InputToResultModifiedColumnSetFactory implements UnaryOperator { @@ -441,6 +463,7 @@ private class InputToResultModifiedColumnSetFactory implements UnaryOperator getHiddenResults() { + return hiddenResults; + } + + private class ResultExtractor implements IterativeChunkedAggregationOperator { + final Map> resultColumns; + final String[] inputColumnNames; + + private ResultExtractor(Map> resultColumns, String[] inputColumnNames) { + this.resultColumns = resultColumns; + this.inputColumnNames = inputColumnNames; + } + + @Override + public Map> getResultColumns() { + return resultColumns; + } + + @Override + public void addChunk(BucketedContext context, Chunk values, + LongChunk inputRowKeys, IntChunk destinations, + IntChunk startPositions, IntChunk length, + WritableBooleanChunk stateModified) {} + + @Override + public void removeChunk(BucketedContext context, Chunk values, + LongChunk inputRowKeys, IntChunk destinations, + IntChunk startPositions, IntChunk length, + WritableBooleanChunk stateModified) {} + + @Override + public boolean addChunk(SingletonContext context, int chunkSize, Chunk values, + LongChunk inputRowKeys, long destination) { + return false; + } + + @Override + public boolean removeChunk(SingletonContext context, int chunkSize, Chunk values, + LongChunk inputRowKeys, long destination) { + return false; + } + + @Override + public void ensureCapacity(long tableSize) {} + + @Override + public void startTrackingPrevValues() {} + + @Override + public UnaryOperator initializeRefreshing(@NotNull QueryTable resultTable, + @NotNull LivenessReferent aggregationUpdateListener) { + return new InputToResultModifiedColumnSetFactory(resultTable, + inputColumnNames, + resultColumns.keySet().toArray(String[]::new)); + } + } + + @NotNull + public IterativeChunkedAggregationOperator resultExtractor(List resultPairs) { + final List inputColumnNamesList = new ArrayList<>(resultPairs.size()); + final Map> resultColumns = new LinkedHashMap<>(resultPairs.size()); + for (final Pair pair : resultPairs) { + final String inputName = pair.input().name(); + inputColumnNamesList.add(inputName); + resultColumns.put(pair.output().name(), inputAggregatedColumns.get(inputName)); + } + return new ResultExtractor(resultColumns, inputColumnNamesList.toArray(String[]::new)); + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByOperator.java new file mode 100644 index 00000000000..df900929f68 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByOperator.java @@ -0,0 +1,23 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl.by; + +import io.deephaven.engine.table.ColumnSource; + +import java.util.Map; + +public interface GroupByOperator extends IterativeChunkedAggregationOperator { + /** + * Get a map from input column names to the corresponding output {@link ColumnSource}. + */ + Map> getInputResultColumns(); + + /** + * Determine whether to propagate changes when input columns have been modified. + * + * @param columnsModified have any of the input columns been modified (as per the MCS)? + * @return true if we have modified our output (e.g., because of additions or modifications). + */ + boolean hasModifications(final boolean columnsModified); +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java new file mode 100644 index 00000000000..779edd80380 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java @@ -0,0 +1,591 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl.by; + +import io.deephaven.api.Pair; +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.*; +import io.deephaven.chunk.attributes.ChunkLengths; +import io.deephaven.chunk.attributes.ChunkPositions; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.liveness.LivenessReferent; +import io.deephaven.engine.rowset.*; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; +import io.deephaven.engine.table.*; +import io.deephaven.engine.table.impl.MatchPair; +import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.sources.ObjectArraySource; +import io.deephaven.engine.table.impl.sources.aggregate.AggregateColumnSource; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.*; +import java.util.function.UnaryOperator; + +import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE; + +/** + * An {@link IterativeChunkedAggregationOperator} used to re-aggregate the results of an + * {@link io.deephaven.api.agg.Aggregation#AggGroup(String...) AggGroup} as part of a rollup. + * + *

+ * The operator is fundamentally different than the {@link GroupByChunkedOperator}. Rather than examining row keys, it + * listens to the rollup's base (or intermediate) level and reads the exposed RowSet column. The relevant RowSets are + * added to a random builder for each state while processing an update (or initialization). At the end of the update + * cycle, it builds the rowsets and updates an internal ObjectArraySource of RowSets. + *

+ * + *

+ * The resulting column sources are once again {@link AggregateColumnSource}, which reuse the wrapped aggregated column + * source from the source table (thus each level of the rollup uses the original table's sources as the input to the + * AggregateColumnSources -- not the immediately prior level). + *

+ */ +public final class GroupByReaggregateOperator implements GroupByOperator { + + private final QueryTable inputTable; + private final boolean registeredWithHelper; + private final String exposeRowSetsAs; + private final MatchPair[] aggregatedColumnPairs; + private final List hiddenResults; + + private final boolean live; + private final ObjectArraySource rowSets; + private final ObjectArraySource addedBuilders; + private final ObjectArraySource removedBuilders; + + private final String[] inputColumnNamesForResults; + private final ModifiedColumnSet inputAggregatedColumnsModifiedColumnSet; + + private final Map> inputAggregatedColumns; + private final Map> resultAggregatedColumns; + + private RowSetBuilderRandom stepDestinationsModified; + private boolean rowsetsModified = false; + + private boolean initialized; + + public GroupByReaggregateOperator( + @NotNull final QueryTable inputTable, + final boolean registeredWithHelper, + @Nullable final String exposeRowSetsAs, + @Nullable List hiddenResults, + @NotNull final MatchPair... aggregatedColumnPairs) { + this.inputTable = inputTable; + this.registeredWithHelper = registeredWithHelper; + this.exposeRowSetsAs = exposeRowSetsAs; + this.hiddenResults = hiddenResults; + this.aggregatedColumnPairs = aggregatedColumnPairs; + + if (exposeRowSetsAs == null) { + throw new IllegalArgumentException("Must expose group RowSets for rollup."); + } + + live = inputTable.isRefreshing(); + rowSets = new ObjectArraySource<>(WritableRowSet.class); + addedBuilders = new ObjectArraySource<>(Object.class); + + inputAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length); + resultAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length); + final List inputColumnNamesForResultsList = new ArrayList<>(); + Arrays.stream(aggregatedColumnPairs).forEach(pair -> { + // we are reaggregating so have to use the left column for everything + final ColumnSource source = inputTable.getColumnSource(pair.leftColumn()); + if (!(source instanceof AggregateColumnSource)) { + throw new IllegalStateException("Expect to reaggregate AggregateColumnSources for a group operation."); + } + @SuppressWarnings("rawtypes") + final ColumnSource realSource = ((AggregateColumnSource) source).getAggregatedSource(); + final AggregateColumnSource aggregateColumnSource = AggregateColumnSource.make(realSource, rowSets); + if (hiddenResults == null || !hiddenResults.contains(pair.output().name())) { + resultAggregatedColumns.put(pair.output().name(), aggregateColumnSource); + inputColumnNamesForResultsList.add(pair.input().name()); + } + inputAggregatedColumns.put(pair.input().name(), aggregateColumnSource); + }); + + inputAggregatedColumnsModifiedColumnSet = + inputTable.newModifiedColumnSet(inputAggregatedColumns.keySet().toArray(String[]::new)); + + if (resultAggregatedColumns.containsKey(exposeRowSetsAs)) { + throw new IllegalArgumentException(String.format( + "Exposing group RowSets as %s, but this conflicts with a requested grouped output column name", + exposeRowSetsAs)); + } + inputColumnNamesForResults = inputColumnNamesForResultsList.toArray(String[]::new); + removedBuilders = live ? new ObjectArraySource<>(Object.class) : null; + initialized = false; + } + + @Override + public void addChunk(final BucketedContext bucketedContext, final Chunk values, + final LongChunk inputRowKeys, + @NotNull final IntChunk destinations, + @NotNull final IntChunk startPositions, + @NotNull final IntChunk length, + @NotNull final WritableBooleanChunk stateModified) { + for (int ii = 0; ii < startPositions.size(); ++ii) { + final int startPosition = startPositions.get(ii); + final int runLength = length.get(ii); + final long destination = destinations.get(startPosition); + addChunk(values.asObjectChunk(), startPosition, runLength, destination); + } + stateModified.fillWithValue(0, startPositions.size(), true); + } + + @Override + public void removeChunk(final BucketedContext bucketedContext, final Chunk values, + final LongChunk inputRowKeys, + @NotNull final IntChunk destinations, @NotNull final IntChunk startPositions, + @NotNull final IntChunk length, @NotNull final WritableBooleanChunk stateModified) { + for (int ii = 0; ii < startPositions.size(); ++ii) { + final int startPosition = startPositions.get(ii); + final int runLength = length.get(ii); + final long destination = destinations.get(startPosition); + removeChunk(values.asObjectChunk(), startPosition, runLength, destination); + } + stateModified.fillWithValue(0, startPositions.size(), true); + } + + @Override + public void modifyChunk(final BucketedContext bucketedContext, + final Chunk previousValues, + final Chunk newValues, + final LongChunk postShiftRowKeys, + @NotNull final IntChunk destinations, + @NotNull final IntChunk startPositions, + @NotNull final IntChunk length, @NotNull final WritableBooleanChunk stateModified) { + for (int ii = 0; ii < startPositions.size(); ++ii) { + final int startPosition = startPositions.get(ii); + final int runLength = length.get(ii); + final long destination = destinations.get(startPosition); + modifyChunk(previousValues.asObjectChunk(), newValues.asObjectChunk(), startPosition, runLength, + destination); + } + stateModified.fillWithValue(0, startPositions.size(), true); + } + + @Override + public void shiftChunk(final BucketedContext bucketedContext, final Chunk previousValues, + final Chunk newValues, + @NotNull final LongChunk preShiftRowKeys, + @NotNull final LongChunk postShiftRowKeys, + @NotNull final IntChunk destinations, @NotNull final IntChunk startPositions, + @NotNull final IntChunk length, @NotNull final WritableBooleanChunk stateModified) { + throw new IllegalStateException("GroupByReaggregateOperator should never be called with shiftChunk"); + } + + @Override + public boolean addChunk(final SingletonContext singletonContext, final int chunkSize, + final Chunk values, + @NotNull final LongChunk inputRowKeys, + final long destination) { + addChunk(values.asObjectChunk(), 0, chunkSize, destination); + return true; + } + + @Override + public boolean removeChunk(final SingletonContext singletonContext, final int chunkSize, + final Chunk values, + @NotNull final LongChunk inputRowKeys, final long destination) { + removeChunk(values.asObjectChunk(), 0, chunkSize, destination); + return true; + } + + @Override + public boolean modifyChunk(final SingletonContext singletonContext, final int chunkSize, + final Chunk previousValues, final Chunk newValues, + final LongChunk postShiftRowKeys, + final long destination) { + modifyChunk(previousValues.asObjectChunk(), newValues.asObjectChunk(), 0, chunkSize, destination); + return true; + } + + @Override + public boolean shiftChunk(final SingletonContext singletonContext, final Chunk previousValues, + final Chunk newValues, + @NotNull final LongChunk preShiftRowKeys, + @NotNull final LongChunk postShiftRowKeys, + final long destination) { + // we don't need to deal with these yet + throw new IllegalStateException( + "Reaggregations should not require shifts, as aggregations have fixed output slots."); + } + + private void addChunk(@NotNull final ObjectChunk rowSets, final int start, + final int length, + final long destination) { + if (length == 0) { + return; + } + accumulateToBuilderRandom(addedBuilders, rowSets, start, length, destination, false); + if (stepDestinationsModified != null) { + stepDestinationsModified.addKey(destination); + } + } + + private void removeChunk(@NotNull final ObjectChunk rowSets, final int start, + final int length, + final long destination) { + if (length == 0) { + return; + } + accumulateToBuilderRandom(removedBuilders, rowSets, start, length, destination, false); + stepDestinationsModified.addKey(destination); + } + + private void modifyChunk(ObjectChunk previousValues, + ObjectChunk newValues, + int start, + int length, + long destination) { + if (length == 0) { + return; + } + + accumulateToBuilderRandom(removedBuilders, previousValues, start, length, destination, true); + accumulateToBuilderRandom(addedBuilders, newValues, start, length, destination, false); + + stepDestinationsModified.addKey(destination); + } + + private static void accumulateToBuilderRandom(@NotNull final ObjectArraySource builderColumn, + @NotNull final ObjectChunk rowSetsToAdd, + final int start, final int length, final long destination, + final boolean previous) { + RowSetBuilderRandom builder = (RowSetBuilderRandom) builderColumn.getUnsafe(destination); + if (builder == null) { + builderColumn.set(destination, builder = RowSetFactory.builderRandom()); + } + // add the keys to the stored builder + for (int ii = 0; ii < length; ++ii) { + RowSet rowSet = rowSetsToAdd.get(start + ii); + if (previous) { + builder.addRowSet(rowSet.trackingCast().prev()); + } else { + builder.addRowSet(rowSet); + } + } + } + + private static WritableRowSet extractAndClearBuilderRandom( + @NotNull final WritableObjectChunk builderChunk, + final int offset) { + final RowSetBuilderRandom builder = builderChunk.get(offset); + if (builder != null) { + final WritableRowSet rowSet = builder.build(); + builderChunk.set(offset, null); + return rowSet; + } + return null; + } + + private static WritableRowSet nullToEmpty(@Nullable final WritableRowSet rowSet) { + return rowSet == null ? RowSetFactory.empty() : rowSet; + } + + @Override + public void ensureCapacity(final long tableSize) { + rowSets.ensureCapacity(tableSize); + addedBuilders.ensureCapacity(tableSize); + if (live) { + removedBuilders.ensureCapacity(tableSize); + } + } + + @Override + public Map> getResultColumns() { + final Map> allResultColumns = + new LinkedHashMap<>(resultAggregatedColumns.size() + 1); + allResultColumns.put(exposeRowSetsAs, rowSets); + allResultColumns.putAll(resultAggregatedColumns); + return allResultColumns; + } + + @Override + public void startTrackingPrevValues() { + // NB: We don't need previous tracking on the rowSets ColumnSource, even if it's exposed. It's in destination + // space, and we never move anything. Nothing should be asking for previous values if they didn't exist + // previously. + // NB: These are usually (always, as of now) instances of AggregateColumnSource, meaning + // startTrackingPrevValues() is a no-op. + inputAggregatedColumns.values().forEach(ColumnSource::startTrackingPrevValues); + } + + @Override + public UnaryOperator initializeRefreshing( + @NotNull final QueryTable resultTable, + @NotNull final LivenessReferent aggregationUpdateListener) { + initializeNewRowSetPreviousValues(resultTable.getRowSet()); + return registeredWithHelper + ? new InputToResultModifiedColumnSetFactory(resultTable, + inputColumnNamesForResults, + resultAggregatedColumns.keySet().toArray(String[]::new)) + : null; + } + + @Override + public Map> getInputResultColumns() { + return inputAggregatedColumns; + } + + @Override + public boolean hasModifications(boolean columnsModified) { + return columnsModified || rowsetsModified; + } + + private class InputToResultModifiedColumnSetFactory implements UnaryOperator { + + private final ModifiedColumnSet updateModifiedColumnSet; + private final ModifiedColumnSet.Transformer aggregatedColumnsTransformer; + + private InputToResultModifiedColumnSetFactory( + @NotNull final QueryTable resultTable, + @NotNull final String[] inputColumnNames, + @NotNull final String[] resultAggregatedColumnNames) { + updateModifiedColumnSet = new ModifiedColumnSet(resultTable.getModifiedColumnSetForUpdates()); + + final String[] allInputs = Arrays.copyOf(inputColumnNames, inputColumnNames.length + 1); + allInputs[allInputs.length - 1] = exposeRowSetsAs; + final ModifiedColumnSet[] affectedColumns = new ModifiedColumnSet[allInputs.length]; + for (int ci = 0; ci < inputColumnNames.length; ++ci) { + affectedColumns[ci] = resultTable.newModifiedColumnSet(resultAggregatedColumnNames[ci]); + } + affectedColumns[allInputs.length - 1] = resultTable.newModifiedColumnSet(allInputs); + + aggregatedColumnsTransformer = inputTable.newModifiedColumnSetTransformer(allInputs, affectedColumns); + } + + @Override + public ModifiedColumnSet apply(@NotNull final ModifiedColumnSet upstreamModifiedColumnSet) { + aggregatedColumnsTransformer.clearAndTransform(upstreamModifiedColumnSet, updateModifiedColumnSet); + return updateModifiedColumnSet; + } + } + + @Override + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + stepDestinationsModified = new BitmapRandomBuilder(startingDestinationsCount); + rowsetsModified = false; + return upstream.modifiedColumnSet().containsAny(inputAggregatedColumnsModifiedColumnSet); + } + + @Override + public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) { + Assert.neqTrue(initialized, "initialized"); + + // use the builders to create the initial rowsets + try (final RowSet initialDestinations = RowSetFactory.flat(startingDestinationsCount); + final ResettableWritableObjectChunk rowSetResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final ResettableWritableObjectChunk addedBuildersResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final RowSequence.Iterator destinationsIterator = + initialDestinations.getRowSequenceIterator()) { + + final WritableObjectChunk rowSetBackingChunk = + rowSetResettableChunk.asWritableObjectChunk(); + final WritableObjectChunk addedBuildersBackingChunk = + addedBuildersResettableChunk.asWritableObjectChunk(); + + while (destinationsIterator.hasMore()) { + final long firstSliceDestination = destinationsIterator.peekNextKey(); + final long firstBackingChunkDestination = + rowSets.resetWritableChunkToBackingStore(rowSetResettableChunk, firstSliceDestination); + addedBuilders.resetWritableChunkToBackingStore(addedBuildersResettableChunk, firstSliceDestination); + + final long lastBackingChunkDestination = + firstBackingChunkDestination + rowSetBackingChunk.size() - 1; + final RowSequence initialDestinationsSlice = + destinationsIterator.getNextRowSequenceThrough(lastBackingChunkDestination); + + initialDestinationsSlice.forAllRowKeys((final long destination) -> { + final int backingChunkOffset = + Math.toIntExact(destination - firstBackingChunkDestination); + final WritableRowSet addRowSet = nullToEmpty( + extractAndClearBuilderRandom(addedBuildersBackingChunk, backingChunkOffset)); + rowSetBackingChunk.set(backingChunkOffset, live ? addRowSet.toTracking() : addRowSet); + }); + } + } + initialized = true; + } + + @Override + public void propagateUpdates(@NotNull final TableUpdate downstream, @NotNull final RowSet newDestinations) { + // get the rowset for the updated items + try (final WritableRowSet stepDestinations = stepDestinationsModified.build()) { + // add the new destinations so a rowset will get created if it doesn't exist + stepDestinations.insert(newDestinations); + + if (stepDestinations.isEmpty()) { + return; + } + + // use the builders to modify the rowsets + try (final ResettableWritableObjectChunk rowSetResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final ResettableWritableObjectChunk addedBuildersResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final ResettableWritableObjectChunk removedBuildersResettableChunk = + ResettableWritableObjectChunk.makeResettableChunk(); + final RowSequence.Iterator destinationsIterator = + stepDestinations.getRowSequenceIterator()) { + + final WritableObjectChunk rowSetBackingChunk = + rowSetResettableChunk.asWritableObjectChunk(); + final WritableObjectChunk addedBuildersBackingChunk = + addedBuildersResettableChunk.asWritableObjectChunk(); + final WritableObjectChunk removedBuildersBackingChunk = + removedBuildersResettableChunk.asWritableObjectChunk(); + + while (destinationsIterator.hasMore()) { + final long firstSliceDestination = destinationsIterator.peekNextKey(); + final long firstBackingChunkDestination = + rowSets.resetWritableChunkToBackingStore(rowSetResettableChunk, firstSliceDestination); + addedBuilders.resetWritableChunkToBackingStore(addedBuildersResettableChunk, + firstSliceDestination); + removedBuilders.resetWritableChunkToBackingStore(removedBuildersResettableChunk, + firstSliceDestination); + + final long lastBackingChunkDestination = + firstBackingChunkDestination + rowSetBackingChunk.size() - 1; + final RowSequence initialDestinationsSlice = + destinationsIterator.getNextRowSequenceThrough(lastBackingChunkDestination); + + initialDestinationsSlice.forAllRowKeys((final long destination) -> { + final int backingChunkOffset = + Math.toIntExact(destination - firstBackingChunkDestination); + final WritableRowSet workingRowSet = rowSetBackingChunk.get(backingChunkOffset); + if (workingRowSet == null) { + // use the addRowSet as the new rowset + final WritableRowSet addRowSet = nullToEmpty( + extractAndClearBuilderRandom(addedBuildersBackingChunk, backingChunkOffset)); + if (!addRowSet.isEmpty()) { + rowsetsModified = true; + } + rowSetBackingChunk.set(backingChunkOffset, live ? addRowSet.toTracking() : addRowSet); + } else { + try (final WritableRowSet addRowSet = + nullToEmpty(extractAndClearBuilderRandom(addedBuildersBackingChunk, + backingChunkOffset)); + final WritableRowSet removeRowSet = + nullToEmpty(extractAndClearBuilderRandom(removedBuildersBackingChunk, + backingChunkOffset))) { + workingRowSet.remove(removeRowSet); + workingRowSet.insert(addRowSet); + if (!addRowSet.isEmpty() || !removeRowSet.isEmpty()) { + rowsetsModified = true; + } + } + } + }); + } + } + stepDestinationsModified = null; + } + initializeNewRowSetPreviousValues(newDestinations); + } + + private void initializeNewRowSetPreviousValues(@NotNull final RowSequence newDestinations) { + if (newDestinations.isEmpty()) { + return; + } + try (final ChunkSource.GetContext rowSetsGetContext = rowSets.makeGetContext(BLOCK_SIZE); + final RowSequence.Iterator newDestinationsIterator = newDestinations.getRowSequenceIterator()) { + while (newDestinationsIterator.hasMore()) { + final long nextDestination = newDestinationsIterator.peekNextKey(); + final long nextBlockEnd = (nextDestination / BLOCK_SIZE) * BLOCK_SIZE + BLOCK_SIZE - 1; + // This RowSequence slice should be exactly aligned to a slice of a single data block in rowsets (since + // it is an ArrayBackedColumnSource), allowing getChunk to skip a copy. + final RowSequence newDestinationsSlice = + newDestinationsIterator.getNextRowSequenceThrough(nextBlockEnd); + final ObjectChunk rowSetsChunk = + rowSets.getChunk(rowSetsGetContext, newDestinationsSlice).asObjectChunk(); + final int rowSetsChunkSize = rowSetsChunk.size(); + for (int ii = 0; ii < rowSetsChunkSize; ++ii) { + rowSetsChunk.get(ii).initializePreviousValue(); + } + } + } + } + + + public String getExposedRowSetsAs() { + return exposeRowSetsAs; + } + + public MatchPair[] getAggregatedColumnPairs() { + return aggregatedColumnPairs; + } + + public List getHiddenResults() { + return hiddenResults; + } + + private class ResultExtractor implements IterativeChunkedAggregationOperator { + final Map> resultColumns; + final String[] inputColumnNames; + + private ResultExtractor(Map> resultColumns, String[] inputColumnNames) { + this.resultColumns = resultColumns; + this.inputColumnNames = inputColumnNames; + } + + @Override + public Map> getResultColumns() { + return resultColumns; + } + + @Override + public void addChunk(BucketedContext context, Chunk values, + LongChunk inputRowKeys, IntChunk destinations, + IntChunk startPositions, IntChunk length, + WritableBooleanChunk stateModified) {} + + @Override + public void removeChunk(BucketedContext context, Chunk values, + LongChunk inputRowKeys, IntChunk destinations, + IntChunk startPositions, IntChunk length, + WritableBooleanChunk stateModified) {} + + @Override + public boolean addChunk(SingletonContext context, int chunkSize, Chunk values, + LongChunk inputRowKeys, long destination) { + return false; + } + + @Override + public boolean removeChunk(SingletonContext context, int chunkSize, Chunk values, + LongChunk inputRowKeys, long destination) { + return false; + } + + @Override + public void ensureCapacity(long tableSize) {} + + @Override + public void startTrackingPrevValues() {} + + @Override + public UnaryOperator initializeRefreshing(@NotNull QueryTable resultTable, + @NotNull LivenessReferent aggregationUpdateListener) { + return new InputToResultModifiedColumnSetFactory(resultTable, + inputColumnNames, + resultColumns.keySet().toArray(String[]::new)); + } + } + + @NotNull + public IterativeChunkedAggregationOperator resultExtractor(List resultPairs) { + final List inputColumnNamesList = new ArrayList<>(resultPairs.size()); + final Map> resultColumns = new LinkedHashMap<>(resultPairs.size()); + for (final Pair pair : resultPairs) { + final String inputName = pair.input().name(); + inputColumnNamesList.add(inputName); + resultColumns.put(pair.output().name(), inputAggregatedColumns.get(inputName)); + } + return new ResultExtractor(resultColumns, inputColumnNamesList.toArray(String[]::new)); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntBlinkSortedFirstOrLastChunkedOperator.java index b1c748aafaa..55b80c3724b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IntBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java index 40881244dab..9630178e9a1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IterativeChunkedAggregationOperator.java @@ -301,11 +301,15 @@ default UnaryOperator initializeRefreshing(@NotNull final Que /** * Reset any per-step internal state. Note that the arguments to this method should not be mutated in any way. - * + * * @param upstream The upstream ShiftAwareListener.Update * @param startingDestinationsCount The number of used destinations at the beginning of this step + * @return true if this operator must generate modifications on this cycle; typically an operator returns false and + * depends on the actual add/remove/modify/shift calls to determine modifications */ - default void resetForStep(@NotNull TableUpdate upstream, int startingDestinationsCount) {} + default boolean resetForStep(@NotNull TableUpdate upstream, int startingDestinationsCount) { + return false; + } /** * Perform any internal state keeping needed for destinations that were added (went from 0 keys to > 0), removed diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongBlinkSortedFirstOrLastChunkedOperator.java index 1e4fb6f0503..2c13e8eeda7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/LongBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectBlinkSortedFirstOrLastChunkedOperator.java index 96ce7fc5e0e..708c5d9c8e8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ObjectBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java index 28f34bfde8a..2166bd527f7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/PartitionByChunkedOperator.java @@ -618,7 +618,7 @@ private void linkTableReferences(@NotNull final Table subTable) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, int startingDestinationsCount) { + public boolean resetForStep(@NotNull final TableUpdate upstream, int startingDestinationsCount) { stepUpdatedDestinations = RowSetFactory.empty(); final boolean upstreamModified = upstream.modified().isNonempty() && upstream.modifiedColumnSet().nonempty(); if (upstreamModified) { @@ -628,6 +628,7 @@ public void resetForStep(@NotNull final TableUpdate upstream, int startingDestin } else { stepValuesModified = false; } + return false; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/RollupConstants.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/RollupConstants.java index 7d34a87adbb..0c2d3aa7bea 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/RollupConstants.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/RollupConstants.java @@ -67,4 +67,10 @@ private RollupConstants() {} * infinity count columns used in rollup aggregations. */ static final String ROLLUP_NI_COUNT_COLUMN_ID = "_NIC_"; + + /** + * Middle column name component (between source column name and {@link #ROLLUP_COLUMN_SUFFIX suffix}) for group + * columns used in support of rollup formulas. + */ + static final String ROLLUP_GRP_COLUMN_ID = "_GRP_"; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortBlinkSortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortBlinkSortedFirstOrLastChunkedOperator.java index 775904c204f..171ea170c1d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortBlinkSortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ShortBlinkSortedFirstOrLastChunkedOperator.java @@ -57,11 +57,12 @@ public void ensureCapacity(final long tableSize) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { - super.resetForStep(upstream, startingDestinationsCount); + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + final boolean modified = super.resetForStep(upstream, startingDestinationsCount); if (isCombo) { changedDestinationsBuilder = RowSetFactory.builderRandom(); } + return modified; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java index f9cace78d97..f91e1fa6022 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/TDigestPercentileOperator.java @@ -202,8 +202,9 @@ private void updateDestination(final long destination) { } @Override - public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { + public boolean resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) { modifiedThisStep = false; + return false; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/RollupTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/RollupTableImpl.java index b96631dabe3..7893d758537 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/RollupTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/RollupTableImpl.java @@ -42,6 +42,7 @@ import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY; import static io.deephaven.engine.table.impl.AbsoluteSortColumnConventions.*; import static io.deephaven.engine.table.impl.BaseTable.shouldCopyAttribute; +import static io.deephaven.engine.table.impl.by.AggregationProcessor.EXPOSED_GROUP_ROW_SETS; import static io.deephaven.engine.table.impl.by.AggregationProcessor.getRowLookup; import static io.deephaven.engine.table.impl.by.AggregationRowLookup.DEFAULT_UNKNOWN_ROW; import static io.deephaven.engine.table.impl.by.AggregationRowLookup.EMPTY_KEY; @@ -272,7 +273,7 @@ public RollupTable withFilter(@NotNull final Filter filter) { final AggregationRowLookup[] levelRowLookups = makeLevelRowLookupsArray(numLevels, filteredBaseLevelRowLookup); final ColumnSource[] levelNodeTableSources = makeLevelNodeTableSourcesArray( numLevels, filteredBaseLevel.getColumnSource(ROLLUP_COLUMN.name(), Table.class)); - rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns); + rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns, source); final WhereFilter[] newFilters; if (rollupKeyFilters == null) { @@ -589,7 +590,7 @@ public static RollupTable makeRollup( final AggregationRowLookup[] levelRowLookups = makeLevelRowLookupsArray(numLevels, getRowLookup(baseLevel)); final ColumnSource
[] levelNodeTableSources = makeLevelNodeTableSourcesArray( numLevels, baseLevel.getColumnSource(ROLLUP_COLUMN.name(), Table.class)); - rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns); + rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns, source); return new RollupTableImpl( attributes, @@ -670,13 +671,15 @@ private static ColumnSource
[] makeLevelNodeTableSourcesArray( * already filled * @param aggregations The aggregations * @param groupByColumns The group-by columns + * @param source the source table for the rollup */ private static void rollupFromBase( @NotNull final QueryTable[] levelTables, @NotNull final AggregationRowLookup[] levelRowLookups, @NotNull final ColumnSource
[] levelNodeTableSources, @NotNull final Collection aggregations, - @NotNull final Collection groupByColumns) { + @NotNull final Collection groupByColumns, + @NotNull final QueryTable source) { final Deque columnsToReaggregateBy = new ArrayDeque<>(groupByColumns); final Deque nullColumnNames = new ArrayDeque<>(groupByColumns.size()); int lastLevelIndex = levelTables.length - 1; @@ -688,7 +691,7 @@ private static void rollupFromBase( nullColumnNames.stream().map(lastLevelDefinition::getColumn).collect(Collectors.toList()); lastLevel = lastLevel.aggNoMemo( - AggregationProcessor.forRollupReaggregated(aggregations, nullColumns, ROLLUP_COLUMN), + AggregationProcessor.forRollupReaggregated(aggregations, nullColumns, ROLLUP_COLUMN, source), false, null, new ArrayList<>(columnsToReaggregateBy)); --lastLevelIndex; levelTables[lastLevelIndex] = lastLevel; @@ -700,7 +703,8 @@ private static void rollupFromBase( private static Stream> filterRollupInternalColumns( @NotNull final Stream> columnDefinitions) { - return columnDefinitions.filter(cd -> !cd.getName().endsWith(ROLLUP_COLUMN_SUFFIX)); + return columnDefinitions.filter(cd -> !cd.getName().endsWith(ROLLUP_COLUMN_SUFFIX) + && !cd.getName().equals(EXPOSED_GROUP_ROW_SETS.name())); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/AggregateColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/AggregateColumnSource.java index 46b75652af1..7f40094a90c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/AggregateColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/AggregateColumnSource.java @@ -22,6 +22,13 @@ public interface AggregateColumnSource, UngroupedColumnSource ungrouped(); + /** + * Get the underlying source that is aggregated by this ColumnSource. + * + * @return the underlying source that is aggregated by this ColumnSource + */ + ColumnSource getAggregatedSource(); + static , DATA_TYPE> AggregateColumnSource make( @NotNull final ColumnSource aggregatedSource, @NotNull final ColumnSource groupRowSetSource) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateColumnSource.java index 92208f64a0e..4da56a40bff 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateColumnSource.java @@ -290,4 +290,9 @@ public boolean isStateless() { public boolean isImmutable() { return aggregatedSource.isImmutable() && groupRowSetSource.isImmutable(); } + + @Override + public ColumnSource getAggregatedSource() { + return aggregatedSource; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateSlicedColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateSlicedColumnSource.java index cac0f4b742f..9f19bbd8d13 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateSlicedColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateSlicedColumnSource.java @@ -381,4 +381,9 @@ public boolean isImmutable() { && (startSource == null || startSource.isImmutable()) && (endSource == null || endSource.isImmutable()); } + + @Override + public ColumnSource getAggregatedSource() { + return aggregatedSource; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/RangeAggregateColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/RangeAggregateColumnSource.java index 66525b669de..aea9ea46d31 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/RangeAggregateColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/RangeAggregateColumnSource.java @@ -346,4 +346,9 @@ public boolean isImmutable() { && startPositionsInclusive.isImmutable() && endPositionsExclusive.isImmutable(); } + + @Override + public ColumnSource getAggregatedSource() { + return aggregated; + } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggBy.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggBy.java index 6cd7f5db7ea..dcea413a1ce 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggBy.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggBy.java @@ -62,6 +62,27 @@ public void setUp() throws Exception { super.setUp(); } + @Test + public void testDoubleFormula() { + ColumnHolder aHolder = col("A", 0, 0, 1, 1, 0, 0, 1, 1, 0, 0); + ColumnHolder bHolder = col("B", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + ColumnHolder cHolder = col("C", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1); + Table table = TableTools.newTable(aHolder, bHolder, cHolder); + show(table); + assertEquals(10, table.size()); + assertEquals(2, table.groupBy("A").size()); + + Table minMax = table.aggBy( + List.of( + AggFormula("f_const=6.0 + 3"), + AggFormula("f_max=max(B)"), + AggFormula("f_sum_two_col=sum(B) + sum(C)")), + "A"); + show(minMax); + + assertEquals(2, minMax.size()); + } + @Test public void testBy() { ColumnHolder aHolder = col("A", 0, 0, 1, 1, 0, 0, 1, 1, 0, 0); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggGroup.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggGroup.java new file mode 100644 index 00000000000..9f830ad623f --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggGroup.java @@ -0,0 +1,125 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl; + +import io.deephaven.api.ColumnName; +import io.deephaven.api.agg.Aggregation; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.rowset.RowSetShiftData; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.by.AggregationProcessor; +import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.TstUtils; +import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; +import io.deephaven.engine.util.TableTools; +import io.deephaven.test.types.OutOfBandTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.*; + +import static io.deephaven.api.agg.Aggregation.*; +import static io.deephaven.engine.table.impl.by.RollupConstants.ROLLUP_COLUMN_SUFFIX; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.testutil.TstUtils.i; +import static io.deephaven.engine.util.TableTools.*; + +@Category(OutOfBandTest.class) +public class TestAggGroup extends RefreshingTableTestCase { + @Test + public void testGroupModifications() { + final QueryTable source = TstUtils.testRefreshingTable( + stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"), + stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"), + intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7), + intCol("Sentinel2", 101, 102, 103, 104, 105, 106, 107)); + + final List aggs = + List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel"), AggGroup("Sentinel2"), AggSum("Sum2=Sentinel2")); + + final QueryTable normal = source.aggNoMemo(AggregationProcessor.forAggregation(aggs), false, null, + List.of(ColumnName.of("Key1"))); + final QueryTable normalNoKey = + source.aggNoMemo(AggregationProcessor.forAggregation(aggs), false, null, List.of()); + final ColumnName rollupColumn = ColumnName.of(ROLLUP_COLUMN_SUFFIX); + final QueryTable base = source.aggNoMemo(AggregationProcessor.forRollupBase(aggs, false, rollupColumn), false, + null, List.of(ColumnName.of("Key1"), ColumnName.of("Key2"))); + final QueryTable reaggregated = base.aggNoMemo(AggregationProcessor.forRollupReaggregated(aggs, + List.of(ColumnDefinition.ofString("Key2")), rollupColumn, source), false, null, + List.of(ColumnName.of("Key1"))); + final QueryTable reaggregated2 = reaggregated.aggNoMemo(AggregationProcessor.forRollupReaggregated(aggs, + List.of(ColumnDefinition.ofString("Key1"), ColumnDefinition.ofString("Key2")), rollupColumn, source), + false, null, + List.of()); + + TableTools.show(normal); + TableTools.show(base); + TableTools.show(reaggregated); + TableTools.show(reaggregated2); + + doCheck(normal, base, reaggregated, normalNoKey, reaggregated2); + + final SimpleListener normalListener = new SimpleListener(normal); + normal.addUpdateListener(normalListener); + final SimpleListener baseListener = new SimpleListener(base); + base.addUpdateListener(baseListener); + final SimpleListener reaggListener = new SimpleListener(reaggregated); + reaggregated.addUpdateListener(reaggListener); + final SimpleListener reaggListener2 = new SimpleListener(reaggregated2); + reaggregated2.addUpdateListener(reaggListener2); + + final ControlledUpdateGraph cug = ExecutionContext.getContext().getUpdateGraph().cast(); + // modify the value of a Sentinel; check the updates + cug.runWithinUnitTestCycle(() -> { + TstUtils.addToTable(source, i(0), stringCol("Key1", "Alpha"), stringCol("Key2", "Delta"), + intCol("Sentinel", 8), intCol("Sentinel2", 101)); + final ModifiedColumnSet mcs = source.getModifiedColumnSetForUpdates(); + mcs.clear(); + mcs.setAll("Sentinel"); + source.notifyListeners(new TableUpdateImpl(i(), i(), i(0), RowSetShiftData.EMPTY, mcs)); + }); + + TableTools.show(normal); + TableTools.show(base); + TableTools.show(reaggregated); + + // make sure the aggregation is still consistent + doCheck(normal, base, reaggregated, normalNoKey, reaggregated2); + + // we should have gotten an update from each of our listeners + checkModified(normalListener, normal, "Sentinel", "Sentinel2"); + checkModified(baseListener, base, "Sentinel", "Sentinel2"); + checkModified(reaggListener, reaggregated, "Sentinel", "Sentinel2"); + checkModified(reaggListener2, reaggregated2, "Sentinel", "Sentinel2"); + } + + private static void checkModified(SimpleListener listener, QueryTable table, final String modColumn, + final String noModColumn) { + System.out.println("update = " + listener.update); + assertEquals(1, listener.count); + assertTrue(listener.update.added().isEmpty()); + assertTrue(listener.update.removed().isEmpty()); + assertEquals(1, listener.update.modified().size()); + assertTrue(listener.update.modifiedColumnSet().containsAll(table.newModifiedColumnSet(modColumn))); + assertFalse(listener.update.modifiedColumnSet().containsAny(table.newModifiedColumnSet(noModColumn))); + } + + private static void doCheck(Table normal, QueryTable base, QueryTable reaggregated, QueryTable normalNoKey, + QueryTable reaggregated2) { + assertEquals(0, normal.update("CheckSum=sum(Sentinel)", "CheckSum2=sum(Sentinel2)") + .where("Sum != CheckSum || Sum2 != CheckSum2").size()); + assertEquals(0, base.update("CheckSum=sum(Sentinel)", "CheckSum2=sum(Sentinel2)") + .where("Sum != CheckSum || Sum2 != CheckSum2").size()); + assertEquals(0, reaggregated.update("CheckSum=sum(Sentinel)", "CheckSum2=sum(Sentinel2)") + .where("Sum != CheckSum || Sum2 != CheckSum2").size()); + assertTableEquals(normal.view("Key1", "Sentinel", "Sum", "Sentinel2", "Sum2"), + reaggregated.view("Key1", "Sentinel", "Sum", "Sentinel2", "Sum2")); + + assertTableEquals(normalNoKey.view("Sentinel", "Sum", "Sentinel2", "Sum2"), + reaggregated2.view("Sentinel", "Sum", "Sentinel2", "Sum2")); + + } +} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java index ebdfd3ecb31..18b5993f5f5 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java @@ -8,10 +8,14 @@ import io.deephaven.api.agg.Aggregation; import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.RowSetShiftData; +import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.hierarchical.HierarchicalTable; import io.deephaven.engine.table.hierarchical.RollupTable; +import io.deephaven.engine.table.impl.util.ColumnHolder; import io.deephaven.engine.testutil.ColumnInfo; +import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.EvalNuggetInterface; import io.deephaven.engine.testutil.TstUtils; import io.deephaven.engine.testutil.generator.IntGenerator; @@ -22,14 +26,14 @@ import io.deephaven.test.types.OutOfBandTest; import io.deephaven.vector.IntVector; import io.deephaven.vector.IntVectorDirect; +import io.deephaven.vector.LongVector; +import io.deephaven.vector.LongVectorDirect; +import org.jspecify.annotations.NonNull; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Random; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -62,7 +66,8 @@ public class TestRollupTable extends RefreshingTableTestCase { AggUnique("unique=intCol"), AggVar("var=intCol"), AggWAvg("intCol", "wavg=intCol"), - AggWSum("intCol", "wsum=intCol")); + AggWSum("intCol", "wsum=intCol"), + AggGroup("grp=intCol")); // Companion list of columns to compare between rollup root and the zero-key equivalent private final String[] columnsToCompare = new String[] { @@ -83,7 +88,8 @@ public class TestRollupTable extends RefreshingTableTestCase { "unique", "var", "wavg", - "wsum" + "wsum", + "grp" }; /** @@ -339,4 +345,328 @@ public void testVectorKeyColumn() { snapshot); freeSnapshotTableChunks(snapshot); } + + @Test + public void testRollupGroupStatic() { + final Table source = TableTools.newTable( + stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"), + stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"), + intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7)); + + final RollupTable rollup1 = + source.rollup(List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel")), "Key1", "Key2"); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollup1.getRowDepthColumn().name(), 0), + stringCol("Key1", arrayWithNull), + stringCol("Key2", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + final Table expected = initialExpectedGrouped(rollup1); + assertTableEquals(expected, snapshot); + freeSnapshotTableChunks(snapshot); + } + + @Test + public void testRollupFormulaStatic() { + testRollupFormulaStatic(false); + testRollupFormulaStatic(true); + } + + private void testRollupFormulaStatic(boolean withGroup) { + final Table source = TableTools.newTable( + stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"), + stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"), + intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7)); + TableTools.show(source); + + final List aggList = new ArrayList<>(); + if (withGroup) { + aggList.add(AggGroup("Sentinel")); + } + aggList.add(AggSum("Sum=Sentinel")); + aggList.add(AggFormula("FSum", "__FORMULA_DEPTH__ == 0 ? max(Sentinel) : 1 + sum(Sentinel)")); + + final RollupTable rollup1 = + source.rollup( + aggList, + "Key1", "Key2"); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollup1.getRowDepthColumn().name(), 0), + stringCol("Key1", arrayWithNull), + stringCol("Key2", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + TableTools.show(snapshot.view(rollup1.getRowDepthColumn().name(), rollup1.getRowExpandedColumn().name(), "Key1", + "Key2", "Sum", "FSum")); + + final Table expectedBase = initialExpectedGrouped(rollup1); + final Table expectedSentinel = withGroup ? expectedBase : expectedBase.dropColumns("Sentinel"); + final Table expected = expectedSentinel.update("FSum=ii == 0 ? 7 : 1 + Sum"); + assertTableEquals(expected, snapshot); + freeSnapshotTableChunks(snapshot); + } + + @Test + public void testRollupFormulaStatic2() { + final Table source = TableTools.newTable( + stringCol("Account", "acct1", "acct1", "acct2", "acct2"), + stringCol("Sym", "leg1", "leg2", "leg1", "leg2"), + intCol("qty", 100, 100, 200, 200), + doubleCol("Dollars", 1000, -500, 2000, -1000)); + + final RollupTable rollup1 = + source.updateView("qty=(long)qty").rollup( + List.of(AggFormula("qty", "__FORMULA_DEPTH__ > 0 ? first(qty) : sum(qty)").asReaggregating(), + AggSum("Dollars")), + "Account", "Sym"); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollup1.getRowDepthColumn().name(), 0), + stringCol("Account", arrayWithNull), + stringCol("Sym", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + + final Table expected = TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3), + booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null), + col("Account", null, "acct1", "acct1", "acct1", "acct2", "acct2", "acct2"), + col("Sym", null, null, "leg1", "leg2", null, "leg1", "leg2"), + longCol("qty", 300, 100, 100, 100, 200, 200, 200), + doubleCol("Dollars", 1500, 500, 1000, -500, 1000, 2000, -1000)); + + assertTableEquals(expected, snapshot); + freeSnapshotTableChunks(snapshot); + } + + @Test + public void testRollupFormulaStatic3() { + testRollupFormulaStatic3(false); + testRollupFormulaStatic3(true); + } + + private void testRollupFormulaStatic3(boolean hasGroup) { + final Table source = TableTools.newTable( + stringCol("Account", "Aardvark", "Aardvark", "Aardvark", "Aardvark", "Badger", "Badger", "Badger", + "Cobra", "Cobra", "Cobra", "Cobra"), + stringCol("Sym", "Apple", "Banana", "Apple", "Apple", "Carrot", "Carrot", "Carrot", "Apple", "Apple", + "Apple", "Dragonfruit"), + longCol("qty", 500, 100, 500, 200, 300, 300, 200, 100, 200, 300, 1500)); + TableTools.show(source); + + final List aggList = new ArrayList<>(); + + if (hasGroup) { + aggList.add(AggGroup("gqty=qty")); + } + aggList.add(AggFormula("qty", "__FORMULA_DEPTH__ == 2 ? min(1000, sum(qty)) : sum(qty)").asReaggregating()); + aggList.add(AggSum("sqty=qty")); + + final RollupTable rollup1 = + source.rollup( + aggList, + "Account", "Sym"); + + final RollupTable rollup2 = rollup1.withNodeOperations( + rollup1.makeNodeOperationsRecorder(RollupTable.NodeType.Aggregated).updateView("SumDiff=sqty-qty")); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollup1.getRowDepthColumn().name(), 0), + stringCol("Account", arrayWithNull), + stringCol("Sym", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = rollup2.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollup2, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + final List> columnHolders = new ArrayList<>(); + columnHolders.add(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 2, 3, 3)); + columnHolders.add(booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, true, + null, null)); + columnHolders.add(stringCol("Account", null, "Aardvark", "Aardvark", "Aardvark", "Badger", "Badger", "Cobra", + "Cobra", "Cobra")); + columnHolders + .add(stringCol("Sym", null, null, "Apple", "Banana", null, "Carrot", null, "Apple", "Dragonfruit")); + columnHolders.add(col("gqty", lv(500, 100, 500, 200, 300, 300, 200, 100, 200, 300, 1500), + /* aardvark */ lv(500, 100, 500, 200), lv(500, 500, 200), lv(100), /* badger */lv(300, 300, 200), + lv(300, 300, 200), /* cobra */ lv(100, 200, 300, 1500), lv(100, 200, 300), lv(1500))); + columnHolders.add(longCol("qty", 3500, /* aardvark */ 1100, 1000, 100, /* badger */800, 800, /* cobra */ 1600, + 600, 1000)); + final Table expected = TableTools.newTable(columnHolders.toArray(ColumnHolder[]::new)) + .update("sqty = sum(gqty)", "SumDiff=sqty-qty"); + + TableTools.show(expected); + + assertTableEquals(hasGroup ? expected : expected.dropColumns("gqty"), snapshot); + + freeSnapshotTableChunks(snapshot); + } + + @Test + public void testRollupFormulaGroupRenames() { + final int[] allValues = {10, 10, 10, 20, 20, 30, 30}; + final Table source = newTable( + stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"), + intCol("Value", allValues)); + final RollupTable simpleSum = + source.rollup(List.of(AggGroup("Values=Value"), AggFormula("Sum = sum(Value)")), "Key"); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(simpleSum.getRowDepthColumn().name(), 0), + stringCol("Key", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = simpleSum.makeSnapshotState(); + final Table snapshot = + snapshotToTable(simpleSum, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + assertTableEquals(TableTools.newTable(intCol(simpleSum.getRowDepthColumn().name(), 1, 2, 2, 2), + booleanCol(simpleSum.getRowExpandedColumn().name(), true, null, null, null), + stringCol("Key", null, "Alpha", "Bravo", "Charlie"), + col("Values", iv(allValues), iv(10, 10, 10), iv(20, 20), iv(30, 30)), longCol("Sum", 130, 30, 40, 60)), + snapshot); + + freeSnapshotTableChunks(snapshot); + } + + private static Table initialExpectedGrouped(RollupTable rollup1) { + return TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3, 2, 3), + booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null, + true, null), + col("Key1", null, "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Bravo", "Charlie", "Charlie"), + col("Key2", null, null, "Delta", "Echo", null, "Delta", "Echo", null, "Echo"), + col("Sentinel", iv(1, 2, 3, 4, 5, 6, 7), iv(1, 3), iv(1), iv(3), iv(2, 6, 7), iv(2), iv(6, 7), + iv(4, 5), iv(4, 5))) + .update("Sum=sum(Sentinel)"); + } + + private static Table secondExpectedGrouped(RollupTable rollup1) { + return TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3, 2, 3), + booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null, + true, null), + col("Key1", null, "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Bravo", "Charlie", "Charlie"), + col("Key2", null, null, "Delta", "Echo", null, "Delta", "Echo", null, "Echo"), + col("Sentinel", iv(1, 2, 3, 4, 5, 7, 8, 9), iv(1, 3, 8), iv(1), iv(3, 8), iv(2, 7), iv(2), iv(7), + iv(4, 5, 9), iv(4, 5, 9))) + .update("Sum=sum(Sentinel)"); + } + + private static @NonNull IntVector iv(final int... ints) { + return new IntVectorDirect(ints); + } + + private static @NonNull LongVector lv(final long... ints) { + return new LongVectorDirect(ints); + } + + @Test + public void testRollupGroupIncremental() { + final QueryTable source = TstUtils.testRefreshingTable( + stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"), + stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"), + intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7)); + + final RollupTable rollup1 = + source.rollup(List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel")), "Key1", "Key2"); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollup1.getRowDepthColumn().name(), 0), + stringCol("Key1", arrayWithNull), + stringCol("Key2", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + final Table expected = initialExpectedGrouped(rollup1); + assertTableEquals(expected, snapshot); + freeSnapshotTableChunks(snapshot); + + final ControlledUpdateGraph cug = source.getUpdateGraph().cast(); + cug.runWithinUnitTestCycle(() -> { + addToTable(source, i(10, 11), stringCol("Key1", "Alpha", "Charlie"), stringCol("Key2", "Echo", "Echo"), + intCol("Sentinel", 8, 9)); + removeRows(source, i(5)); + source.notifyListeners( + new TableUpdateImpl(i(10, 11), i(5), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY)); + }); + + final Table snapshot2 = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot2); + Table expected2 = secondExpectedGrouped(rollup1); + TableTools.showWithRowSet(expected2); + assertTableEquals(expected2, snapshot2); + freeSnapshotTableChunks(snapshot2); + } + + @Test + public void testReusedGrouping() { + final QueryTable source = TstUtils.testRefreshingTable( + stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"), + stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"), + intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7)); + + final RollupTable rollup1 = + source.rollup(List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel"), AggGroup("S2=Sentinel")), "Key1", + "Key2"); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollup1.getRowDepthColumn().name(), 0), + stringCol("Key1", arrayWithNull), + stringCol("Key2", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + final Table expected = initialExpectedGrouped(rollup1).update("S2=Sentinel"); + assertTableEquals(expected, snapshot); + freeSnapshotTableChunks(snapshot); + + final ControlledUpdateGraph cug = source.getUpdateGraph().cast(); + cug.runWithinUnitTestCycle(() -> { + addToTable(source, i(10, 11), stringCol("Key1", "Alpha", "Charlie"), stringCol("Key2", "Echo", "Echo"), + intCol("Sentinel", 8, 9)); + removeRows(source, i(5)); + source.notifyListeners( + new TableUpdateImpl(i(10, 11), i(5), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY)); + }); + + final Table snapshot2 = + snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot2); + Table expected2 = secondExpectedGrouped(rollup1).update("S2=Sentinel"); + TableTools.showWithRowSet(expected2); + assertTableEquals(expected2, snapshot2); + freeSnapshotTableChunks(snapshot2); + } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/by/AggregationProcessorTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/by/AggregationProcessorTest.java index c996fe69542..473cf9d76ce 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/by/AggregationProcessorTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/by/AggregationProcessorTest.java @@ -3,82 +3,30 @@ // package io.deephaven.engine.table.impl.by; -import io.deephaven.api.ColumnName; -import io.deephaven.api.Selectable; import io.deephaven.api.agg.Aggregation; -import io.deephaven.api.agg.Count; -import io.deephaven.api.agg.spec.AggSpec; -import io.deephaven.base.FileUtils; import io.deephaven.chunk.util.pools.ChunkPoolReleaseTracking; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.context.QueryScope; -import io.deephaven.engine.liveness.LivenessScopeStack; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.RowSetShiftData; -import io.deephaven.engine.rowset.TrackingWritableRowSet; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.*; import io.deephaven.engine.table.impl.by.ssmminmax.SsmChunkedMinMaxOperator; -import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker; -import io.deephaven.engine.table.impl.select.IncrementalReleaseFilter; -import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.engine.table.impl.select.SelectColumnFactory; -import io.deephaven.engine.table.impl.select.SourceColumn; +import io.deephaven.engine.table.impl.sources.IntegerSingleValueSource; +import io.deephaven.engine.table.impl.sources.LongSingleValueSource; import io.deephaven.engine.table.impl.sources.NullValueColumnSource; -import io.deephaven.engine.table.impl.sources.UnionRedirection; -import io.deephaven.engine.table.impl.util.ColumnHolder; -import io.deephaven.engine.table.vectors.ColumnVectors; import io.deephaven.engine.testutil.*; -import io.deephaven.engine.testutil.QueryTableTestBase.TableComparator; import io.deephaven.engine.testutil.generator.*; import io.deephaven.engine.testutil.junit4.EngineCleanup; -import io.deephaven.engine.testutil.sources.TestColumnSource; -import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; -import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; -import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; -import io.deephaven.parquet.table.ParquetInstructions; -import io.deephaven.parquet.table.ParquetTools; -import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; -import io.deephaven.test.types.OutOfBandTest; -import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; -import io.deephaven.util.SafeCloseable; -import io.deephaven.util.mutable.MutableInt; -import io.deephaven.vector.IntVector; -import io.deephaven.vector.ObjectVector; -import junit.framework.ComparisonFailure; -import junit.framework.TestCase; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; +import io.deephaven.vector.*; import org.junit.*; -import org.junit.experimental.categories.Category; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.file.Files; + import java.time.Instant; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; import static io.deephaven.api.agg.Aggregation.*; -import static io.deephaven.api.agg.spec.AggSpec.percentile; import static io.deephaven.engine.testutil.TstUtils.*; -import static io.deephaven.engine.util.TableTools.*; -import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION; -import static io.deephaven.util.QueryConstants.*; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.*; +import static io.deephaven.engine.util.TableTools.col; +import static io.deephaven.engine.util.TableTools.longCol; import static org.junit.Assert.assertEquals; public class AggregationProcessorTest { @@ -160,4 +108,71 @@ public void testMinMaxSecondaryTypesBoolean() { .filter(o -> o.getClass().getCanonicalName().contains(".SecondaryOperator")).count()); } + + @Test + public void testGroupReuse() { + // We should only need a single group by operator; but we want to make sure our output order is correct + final Map> csmap = new LinkedHashMap<>(); + csmap.put("Timestamp", NullValueColumnSource.getInstance(Instant.class, null)); + csmap.put("LongValue", NullValueColumnSource.getInstance(long.class, null)); + csmap.put("IntValue", NullValueColumnSource.getInstance(int.class, null)); + + final QueryTable input = new QueryTable(i(0).toTracking(), csmap); + input.setRefreshing(true); + + final List aggs = + Arrays.asList(AggGroup("Timestamp"), AggGroup("LV=LongValue", "IV=IntValue"), AggGroup("TS=Timestamp")); + final Table agged = input.aggBy(aggs); + + assertEquals(ObjectVector.class, agged.getColumnSource("Timestamp").getType()); + assertEquals(Instant.class, agged.getColumnSource("Timestamp").getComponentType()); + assertEquals(LongVector.class, agged.getColumnSource("LV").getType()); + assertEquals(IntVector.class, agged.getColumnSource("IV").getType()); + assertEquals(ObjectVector.class, agged.getColumnSource("TS").getType()); + assertEquals(Instant.class, agged.getColumnSource("TS").getComponentType()); + + final ObjectVector tsVec = new ObjectVectorDirect<>(new Instant[] {null}); + final LongVector longVec = new LongVectorDirect(QueryConstants.NULL_LONG); + final IntVector intVec = new IntVectorDirect(QueryConstants.NULL_INT); + final Table expected = + TableTools.newTable(col("Timestamp", tsVec), col("LV", longVec), col("IV", intVec), col("TS", tsVec)); + assertTableEquals(expected, agged); + + // this part of the test just verifies that we have the secondary operators we expect + final AggregationContext ac = AggregationProcessor.forAggregation(aggs).makeAggregationContext(input, false); + Arrays.stream(ac.operators).forEach(o -> System.out.println(o.getClass().getCanonicalName())); + assertEquals(3, ac.operators.length); + assertEquals(1, Arrays.stream(ac.operators).filter(o -> o instanceof GroupByChunkedOperator).count()); + assertEquals(2, Arrays.stream(ac.operators) + .filter(o -> o.getClass().getCanonicalName().contains("ResultExtractor")).count()); + } + + @Test + public void testFormulaGroupReuse() { + final Map> csmap = new LinkedHashMap<>(); + csmap.put("LongValue", new LongSingleValueSource()); + csmap.put("IntValue", new IntegerSingleValueSource()); + + ((LongSingleValueSource) csmap.get("LongValue")).set(10L); + ((IntegerSingleValueSource) csmap.get("IntValue")).set(20); + + final QueryTable input = new QueryTable(i(0).toTracking(), csmap); + input.setRefreshing(true); + + final List aggs = + Arrays.asList(AggGroup("LongValue"), AggFormula("LS=sum(LongValue)"), AggFormula("IS=sum(IntValue)")); + final Table agged = input.aggBy(aggs); + + final LongVectorDirect lvd = new LongVectorDirect(10L); + assertTableEquals( + TableTools.newTable(col("LongValue", (LongVector) lvd), longCol("LS", 10L), longCol("IS", 20L)), agged); + + // this part of the test just verifies that we have the secondary operators we expect + final AggregationContext ac = AggregationProcessor.forAggregation(aggs).makeAggregationContext(input, false); + Arrays.stream(ac.operators).forEach(o -> System.out.println(o.getClass().getCanonicalName())); + assertEquals(3, ac.operators.length); + assertEquals(1, Arrays.stream(ac.operators).filter(o -> o instanceof GroupByChunkedOperator).count()); + assertEquals(2, + Arrays.stream(ac.operators).filter(o -> o instanceof FormulaMultiColumnChunkedOperator).count()); + } } diff --git a/py/server/tests/test_table.py b/py/server/tests/test_table.py index 329d6bc707e..e8f35c9d04b 100644 --- a/py/server/tests/test_table.py +++ b/py/server/tests/test_table.py @@ -93,9 +93,9 @@ def setUp(self): abs_sum(["aggAbsSum=var"]), var(["aggVar=var"]), weighted_avg("var", ["weights"]), + group(["aggGroup=var"]), ] self.aggs_not_for_rollup = [ - group(["aggGroup=var"]), partition("aggPartition"), median(["aggMed=var"]), pct(0.20, ["aggPct=var"]), diff --git a/table-api/src/main/java/io/deephaven/api/agg/AggregationDescriptions.java b/table-api/src/main/java/io/deephaven/api/agg/AggregationDescriptions.java index 1b268984c96..61df885f0d7 100644 --- a/table-api/src/main/java/io/deephaven/api/agg/AggregationDescriptions.java +++ b/table-api/src/main/java/io/deephaven/api/agg/AggregationDescriptions.java @@ -88,6 +88,7 @@ public void visit(Partition partition) { @Override public void visit(Formula formula) { - out.put(formula.column().name(), "from formula `" + Strings.of(formula.expression()) + "`"); + out.put(formula.column().name(), "from formula `" + Strings.of(formula.expression()) + "`" + + (formula.reaggregateAggregatedValues() ? " (reaggregates values)" : "")); } } diff --git a/table-api/src/main/java/io/deephaven/api/agg/Formula.java b/table-api/src/main/java/io/deephaven/api/agg/Formula.java index 4b78717a58a..682f39799ab 100644 --- a/table-api/src/main/java/io/deephaven/api/agg/Formula.java +++ b/table-api/src/main/java/io/deephaven/api/agg/Formula.java @@ -33,12 +33,19 @@ public static Formula of(ColumnName name, String formula) { } public static Formula of(Selectable selectable) { - return ImmutableFormula.of(selectable); + return ImmutableFormula.of(selectable, false); + } + + public Formula asReaggregating() { + return ImmutableFormula.of(selectable(), true); } @Parameter public abstract Selectable selectable(); + @Parameter + public abstract boolean reaggregateAggregatedValues(); + public ColumnName column() { return selectable().newColumn(); }