objectColumnIterator(@NotNull String co
*
*
*
+ * A blink table can be converted to an append-only table with
+ * {@link io.deephaven.engine.table.impl.BlinkTableTools#blinkToAppendOnly(io.deephaven.engine.table.Table)}.
+ *
+ *
+ *
* Some aggregations (in particular {@link #groupBy} and {@link #partitionBy} cannot provide the desired blink table
* aggregation semantics because doing so would require storing the entire stream of blink updates in memory. If
* that behavior is desired, use {@code blinkToAppendOnly}. If on the other hand, you would like to group or
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
index cb91fea14a4..e6fdc635974 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
@@ -952,11 +952,9 @@ public void visit(@NotNull final Formula formula) {
final int existingGroupByOperatorIndex = existingGroupByOperatorIndex();
if (existingGroupByOperatorIndex >= 0) {
// if we have an existing group by operator, then use it (or update it to reflect our input columns)
- final MatchPair[] matchPairs =
- Arrays.stream(inputNonKeyColumns).map(cn -> new MatchPair(cn, cn)).toArray(MatchPair[]::new);
- groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null, matchPairs);
+ groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null, makeSymmetricMatchPairs(inputNonKeyColumns));
} else {
- groupByChunkedOperator = makeGroupByOperatorForFormula(inputNonKeyColumns, table, null);
+ groupByChunkedOperator = makeGroupByOperatorForFormula(makeSymmetricMatchPairs(inputNonKeyColumns), table, null);
}
final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table,
@@ -1146,26 +1144,34 @@ private void maybeInitializeVectorColumns(Set groupByColumnSet, final Ta
}
- private @NotNull GroupByChunkedOperator makeGroupByOperatorForFormula(String[] inputNonKeyColumns,
+ private @NotNull GroupByChunkedOperator makeGroupByOperatorForFormula(final MatchPair[] pairs,
final QueryTable table, final String exposedRowsets) {
- final MatchPair[] pairs;
- final boolean register;
- if (exposedRowsets == null) {
- register = false;
- pairs = Arrays.stream(inputNonKeyColumns).map(col -> MatchPair.of(Pair.parse(col)))
- .toArray(MatchPair[]::new);
- } else {
- register = true;
- pairs = Arrays
- .stream(inputNonKeyColumns).map(col -> MatchPair.of(
- Pair
- .of(ColumnName.of(col),
- ColumnName.of(col + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX))))
- .toArray(MatchPair[]::new);
- }
+ final boolean register = exposedRowsets != null;
return new GroupByChunkedOperator(table, register, exposedRowsets, null, pairs);
}
+ /**
+ * Convert the array of column names to MatchPairs of the form {@code Col_GRP__ROLLUP__}
+ *
+ * @param cols the columns to convert
+ * @return the mangled name matchpairs
+ */
+ private static MatchPair @NotNull [] makeMangledMatchPairs(String[] cols) {
+ return Arrays
+ .stream(cols).map(col -> new MatchPair(col + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX, col))
+ .toArray(MatchPair[]::new);
+ }
+
+ /**
+ * Convert the array of strings to MatchPairs of the form Col=Col
+ *
+ * @param columns the columns to convert to MatchPairs
+ * @return an array of MatchPairs
+ */
+ private static MatchPair @NotNull [] makeSymmetricMatchPairs(String[] columns) {
+ return Arrays.stream(columns).map(col -> new MatchPair(col, col)).toArray(MatchPair[]::new);
+ }
+
// -----------------------------------------------------------------------------------------------------------------
// Rollup Unsupported Operations
// -----------------------------------------------------------------------------------------------------------------
@@ -1318,6 +1324,8 @@ public void visit(AggSpecGroup group) {
@Override
public void visit(Formula formula) {
+ unsupportedForBlinkTables("Formula for rollup");
+
final SelectColumn selectColumn = SelectColumn.of(formula.selectable());
// Get or create a column definition map composed of vectors of the original column types (or scalars when
@@ -1343,27 +1351,30 @@ public void visit(Formula formula) {
final boolean delegate;
final int existingGroupByOperatorIndex = existingGroupByOperatorIndex();
+ final MatchPair[] mangledMatchPairs = makeMangledMatchPairs(inputNonKeyColumns);
+
if (formula.reaggregateAggregatedValues()) {
if (existingGroupByOperatorIndex >= 0) {
- groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null,
- MatchPair.fromPairs(Pair.from(inputNonKeyColumns)));
+ groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null, mangledMatchPairs);
+ // TODO: do this better with an existing reaggregated op
delegate = false;
} else {
// When we are reaggregating, we do not expose the rowsets, because the next level creates a
// completely fresh operator
- groupByChunkedOperator = makeGroupByOperatorForFormula(inputNonKeyColumns, table, null);
+ groupByChunkedOperator = makeGroupByOperatorForFormula(mangledMatchPairs, table, null);
// the operator is not added, so there is delegation
delegate = true;
}
} else {
if (existingGroupByOperatorIndex >= 0) {
- groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex,
- EXPOSED_GROUP_ROW_SETS.name(), MatchPair.fromPairs(Pair.from(inputNonKeyColumns)));
+ groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, EXPOSED_GROUP_ROW_SETS.name(), mangledMatchPairs);
+ // TODO: do this better
+ List asPairs = Arrays.stream(mangledMatchPairs).map(mp -> Pair.of(mp.input(), mp.output())).collect(Collectors.toList());
+ addNoInputOperator(groupByChunkedOperator.resultExtractor(asPairs));
delegate = false;
} else {
// When we do not reaggregate, the next level needs access to our exposed group row sets
- groupByChunkedOperator =
- makeGroupByOperatorForFormula(inputNonKeyColumns, table, EXPOSED_GROUP_ROW_SETS.name());
+ groupByChunkedOperator = makeGroupByOperatorForFormula(mangledMatchPairs, table, EXPOSED_GROUP_ROW_SETS.name());
addNoInputOperator(groupByChunkedOperator);
// we added the operator, so we cannot delegate
delegate = false;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
index a1543c88b69..531ec4c7787 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
@@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl.by;
+import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.*;
import io.deephaven.chunk.attributes.ChunkLengths;
import io.deephaven.chunk.attributes.ChunkPositions;
@@ -239,8 +240,15 @@ public void propagateInitialState(@NotNull final QueryTable resultTable, int sta
} else {
final Map> columnSourceMap = resultTable.getColumnSourceMap();
sourceColumns = new HashMap<>(groupBy.getInputResultColumns().size() + 1);
- groupBy.getInputResultColumns()
- .forEach((k, v) -> sourceColumns.put(renames == null ? k : renames.get(k), v));
+ for (Map.Entry> entry : groupBy.getInputResultColumns().entrySet()) {
+ final String columnName = entry.getKey();
+ final String renamed;
+ if (renames != null && (renamed = renames.get(columnName)) != null) {
+ sourceColumns.put(renamed, entry.getValue());
+ } else {
+ sourceColumns.put(columnName, entry.getValue());
+ }
+ }
Arrays.stream(inputKeyColumns).forEach(col -> sourceColumns.put(col, columnSourceMap.get(col)));
sourceColumns.put(AggregationProcessor.ROLLUP_FORMULA_DEPTH.name(), formulaDepthSource);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
index 73b8c6448a1..779edd80380 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
@@ -327,7 +327,7 @@ public UnaryOperator initializeRefreshing(
@Override
public Map> getInputResultColumns() {
- return resultAggregatedColumns;
+ return inputAggregatedColumns;
}
@Override
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
index e1f213901b0..67e1c47f0a2 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
@@ -375,8 +375,8 @@ public void testRollupGroupStatic() {
@Test
public void testRollupFormulaStatic() {
- testRollupFormulaStatic(true);
testRollupFormulaStatic(false);
+ testRollupFormulaStatic(true);
}
private void testRollupFormulaStatic(boolean withGroup) {
@@ -522,6 +522,35 @@ private void testRollupFormulaStatic3(boolean hasGroup) {
freeSnapshotTableChunks(snapshot);
}
+ @Test
+ public void testRollupFormulaGroupRenames() {
+ final int[] allValues = {10, 10, 10, 20, 20, 30, 30};
+ final Table source = newTable(
+ stringCol("Key", "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Charlie", "Charlie"),
+ intCol("Value", allValues));
+ final RollupTable simpleSum =
+ source.rollup(List.of(AggGroup("Values=Value"), AggFormula("Sum = sum(Value)")), "Key");
+
+ final String[] arrayWithNull = new String[1];
+ final Table keyTable = newTable(
+ intCol(simpleSum.getRowDepthColumn().name(), 0),
+ stringCol("Key", arrayWithNull),
+ byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL));
+
+ final HierarchicalTable.SnapshotState ss1 = simpleSum.makeSnapshotState();
+ final Table snapshot =
+ snapshotToTable(simpleSum, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
+ TableTools.showWithRowSet(snapshot);
+
+ assertTableEquals(TableTools.newTable(intCol(simpleSum.getRowDepthColumn().name(), 1, 2, 2, 2),
+ booleanCol(simpleSum.getRowExpandedColumn().name(), true, null, null, null),
+ stringCol("Key", null, "Alpha", "Bravo", "Charlie"),
+ col("Values", iv(allValues), iv(10, 10, 10), iv(20, 20), iv(30, 30)), longCol("Sum", 130, 30, 40, 60)),
+ snapshot);
+
+ freeSnapshotTableChunks(snapshot);
+ }
+
private static Table initialExpectedGrouped(RollupTable rollup1) {
return TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3, 2, 3),
booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null,
From b855a737e43b4b36837d23885510886dd7d29aea Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Wed, 14 Jan 2026 16:04:35 -0500
Subject: [PATCH 24/31] operator should always be updated for a formula,
produce mangled results.
---
.../table/impl/by/AggregationProcessor.java | 54 +++++++++++++------
.../by/FormulaMultiColumnChunkedOperator.java | 9 ++++
.../engine/table/impl/TestRollupTable.java | 2 +-
3 files changed, 48 insertions(+), 17 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
index e6fdc635974..9812aaf7ae6 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
@@ -760,10 +760,21 @@ int existingGroupByReaggregateIndex() {
return -1;
}
+ /**
+ * Ensures that the existing GroupByChunkedOperator has the required input/output columns
+ *
+ * @param createExtraPairs When true, create all of the pairs for the group by operator. When false, if there
+ * are any inputs that match the pairs we'll pass the operator through. The AggGroup aggregation can't
+ * just tack pairs onto an existing operator, because the order would be incorrect. Formulas in a rollup
+ * don't expose the results of the shared grouping, so just tacking them on is fine.
+ * @param hideExtras true if the extra pairs should be hidden from results, false otherwise
+ */
GroupByChunkedOperator ensureGroupByOperator(final QueryTable table,
final int existingOperatorIndex,
final String exposeRowSetAs,
- final MatchPair[] matchPairs) {
+ final MatchPair[] matchPairs,
+ final boolean createExtraPairs,
+ final boolean hideExtras) {
boolean recreate = false;
final GroupByChunkedOperator existing = (GroupByChunkedOperator) operators.get(existingOperatorIndex);
if (exposeRowSetAs != null) {
@@ -783,9 +794,17 @@ GroupByChunkedOperator ensureGroupByOperator(final QueryTable table,
for (MatchPair matchPair : matchPairs) {
final String input = matchPair.input().name();
if (Arrays.stream(existing.getAggregatedColumnPairs()).noneMatch(p -> p.input().name().equals(input))) {
+ // we didn't have this in the input at all
newPairs.add(matchPair);
hiddenResults.add(matchPair.output().name());
recreate = true;
+ } else if (createExtraPairs
+ && Arrays.stream(existing.getAggregatedColumnPairs()).noneMatch(p -> p.equals(matchPair))) {
+ newPairs.add(matchPair);
+ if (hideExtras) {
+ hiddenResults.add(matchPair.output().name());
+ }
+ recreate = true;
}
}
if (!recreate) {
@@ -952,9 +971,11 @@ public void visit(@NotNull final Formula formula) {
final int existingGroupByOperatorIndex = existingGroupByOperatorIndex();
if (existingGroupByOperatorIndex >= 0) {
// if we have an existing group by operator, then use it (or update it to reflect our input columns)
- groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null, makeSymmetricMatchPairs(inputNonKeyColumns));
+ groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null,
+ makeSymmetricMatchPairs(inputNonKeyColumns), false, false);
} else {
- groupByChunkedOperator = makeGroupByOperatorForFormula(makeSymmetricMatchPairs(inputNonKeyColumns), table, null);
+ groupByChunkedOperator =
+ makeGroupByOperatorForFormula(makeSymmetricMatchPairs(inputNonKeyColumns), table, null);
}
final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table,
@@ -1024,7 +1045,8 @@ public void visit(@NotNull final AggSpecGroup group) {
if (existingOperator >= 0) {
// Reuse the operator, adding a result extractor for the new result pairs
GroupByChunkedOperator existing =
- ensureGroupByOperator(table, existingOperator, null, MatchPair.fromPairs(resultPairs));
+ ensureGroupByOperator(table, existingOperator, null, MatchPair.fromPairs(resultPairs), false,
+ false);
addNoInputOperator(existing.resultExtractor(resultPairs));
} else {
addNoInputOperator(
@@ -1152,7 +1174,7 @@ private void maybeInitializeVectorColumns(Set groupByColumnSet, final Ta
/**
* Convert the array of column names to MatchPairs of the form {@code Col_GRP__ROLLUP__}
- *
+ *
* @param cols the columns to convert
* @return the mangled name matchpairs
*/
@@ -1164,7 +1186,7 @@ private void maybeInitializeVectorColumns(Set groupByColumnSet, final Ta
/**
* Convert the array of strings to MatchPairs of the form Col=Col
- *
+ *
* @param columns the columns to convert to MatchPairs
* @return an array of MatchPairs
*/
@@ -1313,7 +1335,7 @@ public void visit(AggSpecGroup group) {
if (indexOfExistingOperator >= 0) {
// share the existing operator for groupBy in a rollup base
final GroupByChunkedOperator existing = ensureGroupByOperator(table, indexOfExistingOperator,
- EXPOSED_GROUP_ROW_SETS.name(), MatchPair.fromPairs(resultPairs));
+ EXPOSED_GROUP_ROW_SETS.name(), MatchPair.fromPairs(resultPairs), false, false);
addNoInputOperator(existing.resultExtractor(resultPairs));
} else {
addNoInputOperator(new GroupByChunkedOperator(table, true, EXPOSED_GROUP_ROW_SETS.name(),
@@ -1355,8 +1377,9 @@ public void visit(Formula formula) {
if (formula.reaggregateAggregatedValues()) {
if (existingGroupByOperatorIndex >= 0) {
- groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null, mangledMatchPairs);
- // TODO: do this better with an existing reaggregated op
+ groupByChunkedOperator =
+ ensureGroupByOperator(table, existingGroupByOperatorIndex, null, mangledMatchPairs, true,
+ true);
delegate = false;
} else {
// When we are reaggregating, we do not expose the rowsets, because the next level creates a
@@ -1367,14 +1390,13 @@ public void visit(Formula formula) {
}
} else {
if (existingGroupByOperatorIndex >= 0) {
- groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, EXPOSED_GROUP_ROW_SETS.name(), mangledMatchPairs);
- // TODO: do this better
- List asPairs = Arrays.stream(mangledMatchPairs).map(mp -> Pair.of(mp.input(), mp.output())).collect(Collectors.toList());
- addNoInputOperator(groupByChunkedOperator.resultExtractor(asPairs));
+ groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex,
+ EXPOSED_GROUP_ROW_SETS.name(), mangledMatchPairs, true, false);
delegate = false;
} else {
// When we do not reaggregate, the next level needs access to our exposed group row sets
- groupByChunkedOperator = makeGroupByOperatorForFormula(mangledMatchPairs, table, EXPOSED_GROUP_ROW_SETS.name());
+ groupByChunkedOperator =
+ makeGroupByOperatorForFormula(mangledMatchPairs, table, EXPOSED_GROUP_ROW_SETS.name());
addNoInputOperator(groupByChunkedOperator);
// we added the operator, so we cannot delegate
delegate = false;
@@ -1625,7 +1647,7 @@ public void visit(Formula formula) {
final int existingIndex = existingGroupByOperatorIndex();
if (existingIndex >= 0) {
- groupByOperator = ensureGroupByOperator(table, existingIndex, null, groupPairs);
+ groupByOperator = ensureGroupByOperator(table, existingIndex, null, groupPairs, true, true);
} else {
final List hiddenPairs =
Arrays.stream(groupPairs).map(mp -> mp.left().name()).collect(Collectors.toList());
@@ -1635,7 +1657,7 @@ public void visit(Formula formula) {
// everything gets hidden
final FormulaMultiColumnChunkedOperator op =
new FormulaMultiColumnChunkedOperator(table, groupByOperator,
- true, selectColumn, inputKeyColumns, null, depthSource);
+ true, selectColumn, inputKeyColumns, renames, depthSource);
addOperator(op, null, inputNonKeyColumns);
} else {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
index 531ec4c7787..7cdffaba562 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
@@ -24,8 +24,10 @@
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE;
@@ -252,6 +254,13 @@ public void propagateInitialState(@NotNull final QueryTable resultTable, int sta
Arrays.stream(inputKeyColumns).forEach(col -> sourceColumns.put(col, columnSourceMap.get(col)));
sourceColumns.put(AggregationProcessor.ROLLUP_FORMULA_DEPTH.name(), formulaDepthSource);
}
+ final List missingColumns = selectColumn.getColumns().stream()
+ .filter(column -> !sourceColumns.containsKey(column)).collect(Collectors.toList());
+ if (!missingColumns.isEmpty()) {
+ throw new IllegalStateException(
+ "Columns " + missingColumns + " not found, available columns are: " + sourceColumns.keySet());
+ }
+
selectColumn.initInputs(resultTable.getRowSet(), sourceColumns);
formulaDataSource = selectColumn.getDataView();
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
index 67e1c47f0a2..18b5993f5f5 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
@@ -458,8 +458,8 @@ public void testRollupFormulaStatic2() {
@Test
public void testRollupFormulaStatic3() {
- testRollupFormulaStatic3(true);
testRollupFormulaStatic3(false);
+ testRollupFormulaStatic3(true);
}
private void testRollupFormulaStatic3(boolean hasGroup) {
From 25df338d45f85b82fe01b73ff8b830e3e338ba74 Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Thu, 15 Jan 2026 07:54:13 -0500
Subject: [PATCH 25/31] coverage pointed out an unneeded branch.
---
.../table/impl/by/AggregationProcessor.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
index 9812aaf7ae6..73b0cbf3fa8 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
@@ -949,6 +949,7 @@ public void visit(@NotNull final Partition partition) {
@Override
public void visit(@NotNull final Formula formula) {
+ unsupportedForBlinkTables("Formula");
validateFormulaIsNotReaggregating(formula);
final SelectColumn selectColumn = SelectColumn.of(formula.selectable());
@@ -1645,14 +1646,14 @@ public void visit(Formula formula) {
if (formula.reaggregateAggregatedValues()) {
GroupByChunkedOperator groupByOperator;
- final int existingIndex = existingGroupByOperatorIndex();
- if (existingIndex >= 0) {
- groupByOperator = ensureGroupByOperator(table, existingIndex, null, groupPairs, true, true);
- } else {
- final List hiddenPairs =
- Arrays.stream(groupPairs).map(mp -> mp.left().name()).collect(Collectors.toList());
- groupByOperator = new GroupByChunkedOperator(table, false, null, hiddenPairs, groupPairs);
- }
+ /*
+ * There is no point looking for an existing {@link GroupByChunkedOperator}. Because we are
+ * reaggregating; an existing AggGroup would use the {@link GroupByReaggregateOperator}, and any
+ * formulas would have a delegated operator that is not exposed.
+ */
+ final List hiddenPairs =
+ Arrays.stream(groupPairs).map(mp -> mp.left().name()).collect(Collectors.toList());
+ groupByOperator = new GroupByChunkedOperator(table, false, null, hiddenPairs, groupPairs);
// everything gets hidden
final FormulaMultiColumnChunkedOperator op =
From 2d76c7c71ad4f9089ca757e1346c3830cafbce9b Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Thu, 15 Jan 2026 08:20:29 -0500
Subject: [PATCH 26/31] Share result extractor
---
.../table/impl/by/GroupByChunkedOperator.java | 43 +-----------
.../impl/by/GroupByReaggregateOperator.java | 45 +------------
.../table/impl/by/GroupByResultExtractor.java | 66 +++++++++++++++++++
3 files changed, 71 insertions(+), 83 deletions(-)
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByResultExtractor.java
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java
index a15610c4fe6..9cdd261ea2e 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java
@@ -677,50 +677,11 @@ public List getHiddenResults() {
return hiddenResults;
}
- private class ResultExtractor implements IterativeChunkedAggregationOperator {
- final Map> resultColumns;
- final String[] inputColumnNames;
-
+ private class ResultExtractor extends GroupByResultExtractor {
private ResultExtractor(Map> resultColumns, String[] inputColumnNames) {
- this.resultColumns = resultColumns;
- this.inputColumnNames = inputColumnNames;
- }
-
- @Override
- public Map> getResultColumns() {
- return resultColumns;
- }
-
- @Override
- public void addChunk(BucketedContext context, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, IntChunk destinations,
- IntChunk startPositions, IntChunk length,
- WritableBooleanChunk stateModified) {}
-
- @Override
- public void removeChunk(BucketedContext context, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, IntChunk destinations,
- IntChunk startPositions, IntChunk length,
- WritableBooleanChunk stateModified) {}
-
- @Override
- public boolean addChunk(SingletonContext context, int chunkSize, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, long destination) {
- return false;
- }
-
- @Override
- public boolean removeChunk(SingletonContext context, int chunkSize, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, long destination) {
- return false;
+ super(resultColumns, inputColumnNames);
}
- @Override
- public void ensureCapacity(long tableSize) {}
-
- @Override
- public void startTrackingPrevValues() {}
-
@Override
public UnaryOperator initializeRefreshing(@NotNull QueryTable resultTable,
@NotNull LivenessReferent aggregationUpdateListener) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
index 779edd80380..d4716424ae3 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
@@ -524,50 +524,11 @@ public List getHiddenResults() {
return hiddenResults;
}
- private class ResultExtractor implements IterativeChunkedAggregationOperator {
- final Map> resultColumns;
- final String[] inputColumnNames;
-
- private ResultExtractor(Map> resultColumns, String[] inputColumnNames) {
- this.resultColumns = resultColumns;
- this.inputColumnNames = inputColumnNames;
- }
-
- @Override
- public Map> getResultColumns() {
- return resultColumns;
- }
-
- @Override
- public void addChunk(BucketedContext context, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, IntChunk destinations,
- IntChunk startPositions, IntChunk length,
- WritableBooleanChunk stateModified) {}
-
- @Override
- public void removeChunk(BucketedContext context, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, IntChunk destinations,
- IntChunk startPositions, IntChunk length,
- WritableBooleanChunk stateModified) {}
-
- @Override
- public boolean addChunk(SingletonContext context, int chunkSize, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, long destination) {
- return false;
- }
-
- @Override
- public boolean removeChunk(SingletonContext context, int chunkSize, Chunk extends Values> values,
- LongChunk extends RowKeys> inputRowKeys, long destination) {
- return false;
+ private class ResultExtractor extends GroupByResultExtractor {
+ ResultExtractor(Map> resultColumns, String[] inputColumnNames) {
+ super(resultColumns, inputColumnNames);
}
- @Override
- public void ensureCapacity(long tableSize) {}
-
- @Override
- public void startTrackingPrevValues() {}
-
@Override
public UnaryOperator initializeRefreshing(@NotNull QueryTable resultTable,
@NotNull LivenessReferent aggregationUpdateListener) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByResultExtractor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByResultExtractor.java
new file mode 100644
index 00000000000..2817d5ece94
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByResultExtractor.java
@@ -0,0 +1,66 @@
+//
+// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
+//
+package io.deephaven.engine.table.impl.by;
+
+import io.deephaven.chunk.Chunk;
+import io.deephaven.chunk.IntChunk;
+import io.deephaven.chunk.LongChunk;
+import io.deephaven.chunk.WritableBooleanChunk;
+import io.deephaven.chunk.attributes.ChunkLengths;
+import io.deephaven.chunk.attributes.ChunkPositions;
+import io.deephaven.chunk.attributes.Values;
+import io.deephaven.engine.rowset.chunkattributes.RowKeys;
+import io.deephaven.engine.table.ColumnSource;
+
+import java.util.Map;
+
+/**
+ * Shared class for the {@link GroupByReaggregateOperator} and {@link GroupByChunkedOperator} that gets results from the
+ * original operator and adds them as new result columns, so that only one heavy-weight operator is needed but results
+ * can be in any order when multiple {@link io.deephaven.api.agg.spec.AggSpecGroup}s are specified.
+ */
+abstract class GroupByResultExtractor implements IterativeChunkedAggregationOperator {
+ final Map> resultColumns;
+ final String[] inputColumnNames;
+
+ GroupByResultExtractor(Map> resultColumns, String[] inputColumnNames) {
+ this.resultColumns = resultColumns;
+ this.inputColumnNames = inputColumnNames;
+ }
+
+ @Override
+ public Map> getResultColumns() {
+ return resultColumns;
+ }
+
+ @Override
+ public void addChunk(BucketedContext context, Chunk extends Values> values,
+ LongChunk extends RowKeys> inputRowKeys, IntChunk destinations,
+ IntChunk startPositions, IntChunk length,
+ WritableBooleanChunk stateModified) {}
+
+ @Override
+ public void removeChunk(BucketedContext context, Chunk extends Values> values,
+ LongChunk extends RowKeys> inputRowKeys, IntChunk destinations,
+ IntChunk startPositions, IntChunk length,
+ WritableBooleanChunk stateModified) {}
+
+ @Override
+ public boolean addChunk(SingletonContext context, int chunkSize, Chunk extends Values> values,
+ LongChunk extends RowKeys> inputRowKeys, long destination) {
+ return false;
+ }
+
+ @Override
+ public boolean removeChunk(SingletonContext context, int chunkSize, Chunk extends Values> values,
+ LongChunk extends RowKeys> inputRowKeys, long destination) {
+ return false;
+ }
+
+ @Override
+ public void ensureCapacity(long tableSize) {}
+
+ @Override
+ public void startTrackingPrevValues() {}
+}
From 75403f890cc6d93f02bc0b1607015edd9798c8e6 Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Thu, 15 Jan 2026 09:34:47 -0500
Subject: [PATCH 27/31] Add broken removal test.
---
.../impl/by/GroupByReaggregateOperator.java | 2 +-
.../engine/table/impl/TestRollupTable.java | 25 +++++++++++++++++++
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
index d4716424ae3..462b30e40dd 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
@@ -231,7 +231,7 @@ private void removeChunk(@NotNull final ObjectChunk ro
if (length == 0) {
return;
}
- accumulateToBuilderRandom(removedBuilders, rowSets, start, length, destination, false);
+ accumulateToBuilderRandom(removedBuilders, rowSets, start, length, destination, true);
stepDestinationsModified.addKey(destination);
}
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
index 18b5993f5f5..032bfca9759 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
@@ -623,6 +623,31 @@ public void testRollupGroupIncremental() {
TableTools.showWithRowSet(expected2);
assertTableEquals(expected2, snapshot2);
freeSnapshotTableChunks(snapshot2);
+
+ TableTools.showWithRowSet(source);
+ // remove a key from source, so that reaggregate has to do some removals
+ cug.runWithinUnitTestCycle(() -> {
+ removeRows(source, i(0));
+ source.notifyListeners(
+ new TableUpdateImpl(i(), i(0), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
+ });
+ TableTools.showWithRowSet(source);
+
+ final Table snapshot3 =
+ snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
+ TableTools.showWithRowSet(snapshot2);
+ Table expected3 = TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 2, 3, 3, 2, 3),
+ booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, true, null, null,
+ true, null),
+ col("Key1", null, "Alpha", "Alpha", "Bravo", "Bravo", "Bravo", "Charlie", "Charlie"),
+ col("Key2", null, null, "Echo", null, "Delta", "Echo", null, "Echo"),
+ col("Sentinel", iv(2, 3, 4, 5, 7, 8, 9), iv(3, 8), iv(3, 8), iv(2, 7), iv(2), iv(7),
+ iv(4, 5, 9), iv(4, 5, 9)))
+ .update("Sum=sum(Sentinel)");
+
+ TableTools.showWithRowSet(expected3);
+ assertTableEquals(expected3, snapshot3);
+ freeSnapshotTableChunks(snapshot3);
}
@Test
From eb07f99fd2bc6f1118ffa7c1e3d5ee1be8d39fc7 Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Fri, 16 Jan 2026 09:57:40 -0500
Subject: [PATCH 28/31] Fix for the test.
---
.../impl/by/FormulaMultiColumnChunkedOperator.java | 13 ++++++++-----
.../table/impl/by/GroupByReaggregateOperator.java | 6 +++++-
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
index 7cdffaba562..79d38691311 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java
@@ -22,10 +22,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -291,7 +288,13 @@ public UnaryOperator initializeRefreshing(@NotNull final Quer
return inputToResultModifiedColumnSetFactory = input -> ModifiedColumnSet.EMPTY;
}
final ModifiedColumnSet resultMCS = resultTable.newModifiedColumnSet(selectColumn.getName());
- final String[] inputColumnNames = selectColumn.getColumns().toArray(String[]::new);
+ final Map inverseRenames = new HashMap<>();
+ if (renames != null) {
+ renames.forEach((k, v) -> inverseRenames.put(v, k));
+ }
+ final String[] inputColumnNames = selectColumn.getColumns().stream()
+ .filter(c -> !c.equals(AggregationProcessor.ROLLUP_FORMULA_DEPTH.name()))
+ .map(c -> inverseRenames.getOrDefault(c, c)).toArray(String[]::new);
final ModifiedColumnSet inputMCS = inputTable.newModifiedColumnSet(inputColumnNames);
return inputToResultModifiedColumnSetFactory = input -> {
if (groupBy.hasModifications(input.containsAny(inputMCS))) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
index 462b30e40dd..8839acee713 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
@@ -338,6 +338,7 @@ public boolean hasModifications(boolean columnsModified) {
private class InputToResultModifiedColumnSetFactory implements UnaryOperator {
private final ModifiedColumnSet updateModifiedColumnSet;
+ private final ModifiedColumnSet allOutputColumns;
private final ModifiedColumnSet.Transformer aggregatedColumnsTransformer;
private InputToResultModifiedColumnSetFactory(
@@ -352,13 +353,16 @@ private InputToResultModifiedColumnSetFactory(
for (int ci = 0; ci < inputColumnNames.length; ++ci) {
affectedColumns[ci] = resultTable.newModifiedColumnSet(resultAggregatedColumnNames[ci]);
}
- affectedColumns[allInputs.length - 1] = resultTable.newModifiedColumnSet(allInputs);
+ affectedColumns[allInputs.length - 1] = allOutputColumns = resultTable.newModifiedColumnSet(allInputs);
aggregatedColumnsTransformer = inputTable.newModifiedColumnSetTransformer(allInputs, affectedColumns);
}
@Override
public ModifiedColumnSet apply(@NotNull final ModifiedColumnSet upstreamModifiedColumnSet) {
+ if (rowsetsModified) {
+ return allOutputColumns;
+ }
aggregatedColumnsTransformer.clearAndTransform(upstreamModifiedColumnSet, updateModifiedColumnSet);
return updateModifiedColumnSet;
}
From 3d7c33d687624d2e7acf5a0d6bc325c5fac0829f Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Fri, 16 Jan 2026 15:40:35 -0500
Subject: [PATCH 29/31] more coverage, fix for zero sized snapshot chunk leak
---
.../engine/table/impl/TestRollupTable.java | 48 ++++++++++++++++---
.../testutil/HierarchicalTableTestTools.java | 2 +
2 files changed, 43 insertions(+), 7 deletions(-)
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
index 032bfca9759..84f498ef600 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java
@@ -7,8 +7,7 @@
import io.deephaven.api.SortColumn;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.engine.context.QueryScope;
-import io.deephaven.engine.rowset.RowSetFactory;
-import io.deephaven.engine.rowset.RowSetShiftData;
+import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.hierarchical.HierarchicalTable;
@@ -620,22 +619,18 @@ public void testRollupGroupIncremental() {
snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
TableTools.showWithRowSet(snapshot2);
Table expected2 = secondExpectedGrouped(rollup1);
- TableTools.showWithRowSet(expected2);
assertTableEquals(expected2, snapshot2);
freeSnapshotTableChunks(snapshot2);
- TableTools.showWithRowSet(source);
// remove a key from source, so that reaggregate has to do some removals
cug.runWithinUnitTestCycle(() -> {
removeRows(source, i(0));
source.notifyListeners(
new TableUpdateImpl(i(), i(0), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
});
- TableTools.showWithRowSet(source);
final Table snapshot3 =
snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
- TableTools.showWithRowSet(snapshot2);
Table expected3 = TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 2, 3, 3, 2, 3),
booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, true, null, null,
true, null),
@@ -645,9 +640,48 @@ public void testRollupGroupIncremental() {
iv(4, 5, 9), iv(4, 5, 9)))
.update("Sum=sum(Sentinel)");
- TableTools.showWithRowSet(expected3);
assertTableEquals(expected3, snapshot3);
freeSnapshotTableChunks(snapshot3);
+
+ // remove everything, we want to validate the zero key removals for the operator
+ cug.runWithinUnitTestCycle(() -> {
+ final RowSet toRemove = source.getRowSet().copy();
+ System.out.println("To Remove: " + toRemove);
+ removeRows(source, toRemove);
+ source.notifyListeners(
+ new TableUpdateImpl(i(), toRemove, i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
+ });
+
+ final Table snapshot4 =
+ snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
+ final Table expected4 = TableTools.newTable(intCol(rollup1.getRowDepthColumn().name()),
+ booleanCol(rollup1.getRowExpandedColumn().name()),
+ stringCol("Key1"),
+ stringCol("Key2"),
+ col("Sentinel", new IntVector[0]),
+ longCol("Sum")).where("false");
+
+ assertTableEquals(expected4, snapshot4);
+ TableTools.showWithRowSet(snapshot4);
+ freeSnapshotTableChunks(snapshot4);
+
+ // we should make sure there are some additions in reaggregation, let's just add the whole original back
+ cug.runWithinUnitTestCycle(() -> {
+ final WritableRowSet toAdd = RowSetFactory.flat(7);
+ TstUtils.addToTable(source, toAdd,
+ stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"),
+ stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"),
+ intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7));
+
+ source.notifyListeners(
+ new TableUpdateImpl(toAdd, i(), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
+ });
+
+ final Table snapshot5 =
+ snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
+
+ assertTableEquals(expected, snapshot5);
+ freeSnapshotTableChunks(snapshot5);
}
@Test
diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/HierarchicalTableTestTools.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/HierarchicalTableTestTools.java
index 31a77ed7f56..3ebbd4c1321 100644
--- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/HierarchicalTableTestTools.java
+++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/HierarchicalTableTestTools.java
@@ -101,6 +101,8 @@ public static Table snapshotToTable(
chunk.getChunkType(), columnDefinition.getDataType(), columnDefinition.getComponentType());
if (snapshotSize > 0) {
chunkColumnSource.addChunk(chunk);
+ } else {
+ chunk.close();
}
final ColumnSource> source;
if (columnDefinition.getDataType() == Boolean.class && chunkColumnSource.getType() == byte.class) {
From 15f02649a2f14322a60da3003395d9c06f80eebf Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Fri, 16 Jan 2026 16:32:41 -0500
Subject: [PATCH 30/31] Deduplicate a bit.
---
.../table/impl/by/AggregationProcessor.java | 175 +++++++++---------
1 file changed, 84 insertions(+), 91 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
index 73b0cbf3fa8..61130acf3ba 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
@@ -103,6 +103,7 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.jspecify.annotations.NonNull;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -264,7 +265,16 @@ public static AggregationContextFactory forSelectDistinct() {
public static final ColumnName EXPOSED_GROUP_ROW_SETS = ColumnName.of("__EXPOSED_GROUP_ROW_SETS__");
+ /**
+ * The name of the column we inject into AggFormula to permit the user to vary the formula based on which level of
+ * the rollup is being evaluated.
+ */
public static final ColumnName ROLLUP_FORMULA_DEPTH = ColumnName.of("__FORMULA_DEPTH__");
+ /**
+ * The definition of the formula depth column in a rollup.
+ */
+ private static final Map> FORMULA_DEPTH_DEFINITION = Map.of(ROLLUP_FORMULA_DEPTH.name(),
+ ColumnDefinition.of(ROLLUP_FORMULA_DEPTH.name(), IntType.of()));
/**
* Create a trivial {@link AggregationContextFactory} to {@link Aggregation#AggGroup(String...) group} the input
@@ -349,6 +359,7 @@ private abstract class Converter implements Aggregation.Visitor, AggSpec.Visitor
final List inputColumnNames = new ArrayList<>();
final List> inputSources = new ArrayList<>();
final List transformers = new ArrayList<>();
+ final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch();
List resultPairs = List.of();
int freezeByCountIndex = -1;
@@ -369,7 +380,9 @@ private Converter(
AggregationContext build() {
walkAllAggregations();
transformers.add(new RowLookupAttributeSetter());
- return makeAggregationContext();
+ AggregationContext aggregationContext = makeAggregationContext();
+ compilationProcessor.compile();
+ return aggregationContext;
}
final void walkAllAggregations() {
@@ -883,6 +896,30 @@ GroupByReaggregateOperator ensureGroupByReaggregateOperator(final QueryTable tab
operators.set(existingOperatorIndex, newOperator);
return newOperator;
}
+
+ @NonNull
+ PrepareFormulaResult prepareFormula(SelectColumn selectColumn, final TableDefinition tableDefinition,
+ final Map> extraColumns) {
+ // Get or create a column definition map composed of vectors of the original column types (or scalars when
+ // part of the key columns).
+ final Set groupByColumnSet = Set.of(groupByColumnNames);
+
+ maybeInitializeVectorColumns(groupByColumnSet, tableDefinition, extraColumns);
+
+ // Get the input column names from the formula and provide them to the groupBy operator
+ final String[] allInputColumns =
+ selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);
+
+ final Map> partitioned = Arrays.stream(allInputColumns)
+ .collect(Collectors.partitioningBy(
+ o -> groupByColumnSet.contains(o) || ROLLUP_FORMULA_DEPTH.name().equals(o)));
+ final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new);
+ final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new);
+
+ validateSelectColumnForFormula(selectColumn);
+
+ return new PrepareFormulaResult(inputKeyColumns, inputNonKeyColumns);
+ }
}
// -----------------------------------------------------------------------------------------------------------------
@@ -894,21 +931,11 @@ GroupByReaggregateOperator ensureGroupByReaggregateOperator(final QueryTable tab
* {@link AggregationContext} for standard aggregations. Accumulates state by visiting each aggregation.
*/
private final class NormalConverter extends Converter {
- private final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor;
-
private NormalConverter(
@NotNull final Table table,
final boolean requireStateChangeRecorder,
@NotNull final String... groupByColumnNames) {
super(table, requireStateChangeRecorder, groupByColumnNames);
- this.compilationProcessor = QueryCompilerRequestProcessor.batch();
- }
-
- @Override
- AggregationContext build() {
- final AggregationContext resultContext = super.build();
- compilationProcessor.compile();
- return resultContext;
}
// -------------------------------------------------------------------------------------------------------------
@@ -953,34 +980,23 @@ public void visit(@NotNull final Formula formula) {
validateFormulaIsNotReaggregating(formula);
final SelectColumn selectColumn = SelectColumn.of(formula.selectable());
- // Get or create a column definition map composed of vectors of the original column types (or scalars when
- // part of the key columns).
- final Set groupByColumnSet = Set.of(groupByColumnNames);
- maybeInitializeVectorColumns(groupByColumnSet, table.getDefinition(), Map.of());
+ final PrepareFormulaResult prepareFormula = prepareFormula(selectColumn, table.getDefinition(), Map.of());
- // Get the input column names from the formula and provide them to the groupBy operator
- final String[] allInputColumns =
- selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);
-
- final Map> partitioned = Arrays.stream(allInputColumns)
- .collect(Collectors.partitioningBy(groupByColumnSet::contains));
- final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new);
- final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new);
-
- validateSelectColumnForFormula(selectColumn);
final GroupByChunkedOperator groupByChunkedOperator;
final int existingGroupByOperatorIndex = existingGroupByOperatorIndex();
if (existingGroupByOperatorIndex >= 0) {
// if we have an existing group by operator, then use it (or update it to reflect our input columns)
groupByChunkedOperator = ensureGroupByOperator(table, existingGroupByOperatorIndex, null,
- makeSymmetricMatchPairs(inputNonKeyColumns), false, false);
+ makeSymmetricMatchPairs(prepareFormula.inputNonKeyColumns), false, false);
} else {
groupByChunkedOperator =
- makeGroupByOperatorForFormula(makeSymmetricMatchPairs(inputNonKeyColumns), table, null);
+ makeGroupByOperatorForFormula(makeSymmetricMatchPairs(prepareFormula.inputNonKeyColumns), table,
+ null);
}
final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table,
- groupByChunkedOperator, existingGroupByOperatorIndex < 0, selectColumn, inputKeyColumns, null,
+ groupByChunkedOperator, existingGroupByOperatorIndex < 0, selectColumn,
+ prepareFormula.inputKeyColumns, null,
null);
addNoInputOperator(op);
}
@@ -1129,6 +1145,9 @@ public void visit(@NotNull final AggSpecVar var) {
}
}
+ /**
+ * AggFormula SelectColumns are not permitted to use column arrays or virtual row variables; verify that.
+ */
private static void validateSelectColumnForFormula(SelectColumn selectColumn) {
if (!selectColumn.getColumnArrays().isEmpty()) {
throw new IllegalArgumentException("AggFormula does not support column arrays ("
@@ -1139,12 +1158,22 @@ private static void validateSelectColumnForFormula(SelectColumn selectColumn) {
}
}
+ /**
+ * Don't let AggFormula formulas reaggregate except in a rollup.
+ */
private static void validateFormulaIsNotReaggregating(Formula formula) {
if (formula.reaggregateAggregatedValues()) {
throw new IllegalArgumentException("AggFormula does not support reaggregating except in a rollup.");
}
}
+ /**
+ * Setup the column definitions for a formula.
+ *
+ * @param groupByColumnSet group by columns are passed through unchanged (as scalars)
+ * @param definition the definition of our source table
+ * @param extraColumns extra columns to add (for formula depth)
+ */
private void maybeInitializeVectorColumns(Set groupByColumnSet, final TableDefinition definition,
Map> extraColumns) {
if (vectorColumnDefinitions != null) {
@@ -1277,7 +1306,6 @@ private static void rollupUnsupported(@NotNull final String operationName, final
*/
private final class RollupBaseConverter extends Converter
implements RollupAggregation.Visitor, UnsupportedRollupAggregations {
- private final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor;
private int nextColumnIdentifier = 0;
@@ -1286,13 +1314,11 @@ private RollupBaseConverter(
final boolean requireStateChangeRecorder,
@NotNull final String... groupByColumnNames) {
super(table, requireStateChangeRecorder, groupByColumnNames);
- this.compilationProcessor = QueryCompilerRequestProcessor.batch();
}
@Override
AggregationContext build() {
final AggregationContext resultContext = super.build();
- compilationProcessor.compile();
return resultContext;
}
@@ -1351,30 +1377,14 @@ public void visit(Formula formula) {
final SelectColumn selectColumn = SelectColumn.of(formula.selectable());
- // Get or create a column definition map composed of vectors of the original column types (or scalars when
- // part of the key columns).
- final Set groupByColumnSet = Set.of(groupByColumnNames);
- // For the base of a rollup, we use the original table definition, but tack on a rollup depth column
- maybeInitializeVectorColumns(groupByColumnSet, table.getDefinition(), Map.of(ROLLUP_FORMULA_DEPTH.name(),
- ColumnDefinition.of(ROLLUP_FORMULA_DEPTH.name(), IntType.of())));
-
- // Get the input column names from the formula and provide them to the groupBy operator
- final String[] allInputColumns =
- selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);
-
- final Map> partitioned = Arrays.stream(allInputColumns)
- .collect(Collectors.partitioningBy(
- o -> groupByColumnSet.contains(o) || ROLLUP_FORMULA_DEPTH.name().equals(o)));
- final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new);
- final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new);
-
- validateSelectColumnForFormula(selectColumn);
+ final PrepareFormulaResult prepareFormula =
+ prepareFormula(selectColumn, table.getDefinition(), FORMULA_DEPTH_DEFINITION);
final GroupByChunkedOperator groupByChunkedOperator;
final boolean delegate;
final int existingGroupByOperatorIndex = existingGroupByOperatorIndex();
- final MatchPair[] mangledMatchPairs = makeMangledMatchPairs(inputNonKeyColumns);
+ final MatchPair[] mangledMatchPairs = makeMangledMatchPairs(prepareFormula.inputNonKeyColumns);
if (formula.reaggregateAggregatedValues()) {
if (existingGroupByOperatorIndex >= 0) {
@@ -1408,7 +1418,7 @@ public void visit(Formula formula) {
depthSource.set(groupByColumnNames.length);
final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table,
- groupByChunkedOperator, delegate, selectColumn, inputKeyColumns, null, depthSource);
+ groupByChunkedOperator, delegate, selectColumn, prepareFormula.inputKeyColumns, null, depthSource);
addNoInputOperator(op);
}
@@ -1519,7 +1529,6 @@ IterativeChunkedAggregationOperator apply(
private final class RollupReaggregatedConverter extends Converter
implements RollupAggregation.Visitor, UnsupportedRollupAggregations {
- private final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor;
private int nextColumnIdentifier = 0;
private RollupReaggregatedConverter(
@@ -1527,14 +1536,6 @@ private RollupReaggregatedConverter(
final boolean requireStateChangeRecorder,
@NotNull final String... groupByColumnNames) {
super(table, requireStateChangeRecorder, groupByColumnNames);
- this.compilationProcessor = QueryCompilerRequestProcessor.batch();
- }
-
- @Override
- AggregationContext build() {
- final AggregationContext resultContext = super.build();
- compilationProcessor.compile();
- return resultContext;
}
// -------------------------------------------------------------------------------------------------------------
@@ -1601,42 +1602,23 @@ public void visit(AggSpecGroup group) {
public void visit(Formula formula) {
final SelectColumn selectColumn = SelectColumn.of(formula.selectable());
- // Get or create a column definition map composed of vectors of the original column types (or scalars when
- // part of the key columns).
- final Set groupByColumnSet = Set.of(groupByColumnNames);
-
- // for a reaggregated formula, we can't use the input definition as is; we want to use the definition from
- // the source table; but tack on our rollup depth column
- AggregationProcessor thisProcessor = AggregationProcessor.this;
- final TableDefinition sourceDefinition = ((WithSource) thisProcessor).source.getDefinition();
- maybeInitializeVectorColumns(groupByColumnSet, sourceDefinition, Map.of(ROLLUP_FORMULA_DEPTH.name(),
- ColumnDefinition.of(ROLLUP_FORMULA_DEPTH.name(), IntType.of())));
-
- // Get the input column names from the formula and provide them to the groupBy operator
- final String[] allInputColumns =
- selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);
-
- final Map> partitioned = Arrays.stream(allInputColumns)
- .collect(Collectors.partitioningBy(
- o -> groupByColumnSet.contains(o) || ROLLUP_FORMULA_DEPTH.name().equals(o)));
- final String[] inputKeyColumns = partitioned.get(true).toArray(String[]::new);
- final String[] inputNonKeyColumns = partitioned.get(false).toArray(String[]::new);
-
- validateSelectColumnForFormula(selectColumn);
+ final PrepareFormulaResult prepareFormula = prepareFormula(selectColumn,
+ ((WithSource) AggregationProcessor.this).source.getDefinition(), FORMULA_DEPTH_DEFINITION);
final Map renames = new HashMap<>();
- final MatchPair[] groupPairs = new MatchPair[inputNonKeyColumns.length];
+ final MatchPair[] groupPairs = new MatchPair[prepareFormula.inputNonKeyColumns.length];
- for (int ii = 0; ii < inputNonKeyColumns.length; ++ii) {
- final String mangledColumn = inputNonKeyColumns[ii] + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX;
+ for (int ii = 0; ii < prepareFormula.inputNonKeyColumns.length; ++ii) {
+ final String mangledColumn =
+ prepareFormula.inputNonKeyColumns[ii] + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX;
if (table.hasColumns(mangledColumn)) {
groupPairs[ii] = new MatchPair(mangledColumn, mangledColumn);
- renames.put(mangledColumn, inputNonKeyColumns[ii]);
+ renames.put(mangledColumn, prepareFormula.inputNonKeyColumns[ii]);
} else {
// reagg uses the output name
- groupPairs[ii] = new MatchPair(mangledColumn, inputNonKeyColumns[ii]);
+ groupPairs[ii] = new MatchPair(mangledColumn, prepareFormula.inputNonKeyColumns[ii]);
// we are not changing the input column name, so don't need the rename
- renames.put(inputNonKeyColumns[ii], inputNonKeyColumns[ii]);
+ renames.put(prepareFormula.inputNonKeyColumns[ii], prepareFormula.inputNonKeyColumns[ii]);
}
}
@@ -1658,9 +1640,9 @@ public void visit(Formula formula) {
// everything gets hidden
final FormulaMultiColumnChunkedOperator op =
new FormulaMultiColumnChunkedOperator(table, groupByOperator,
- true, selectColumn, inputKeyColumns, renames, depthSource);
+ true, selectColumn, prepareFormula.inputKeyColumns, renames, depthSource);
- addOperator(op, null, inputNonKeyColumns);
+ addOperator(op, null, prepareFormula.inputNonKeyColumns);
} else {
final ColumnSource> groupRowSet = table.getColumnSource(EXPOSED_GROUP_ROW_SETS.name());
GroupByReaggregateOperator groupByOperator;
@@ -1678,11 +1660,12 @@ public void visit(Formula formula) {
final FormulaMultiColumnChunkedOperator op =
new FormulaMultiColumnChunkedOperator(table, groupByOperator,
- false, selectColumn, inputKeyColumns, renames, depthSource);
+ false, selectColumn, prepareFormula.inputKeyColumns, renames, depthSource);
addOperator(op, groupRowSet, EXPOSED_GROUP_ROW_SETS.name());
}
}
+
// -------------------------------------------------------------------------------------------------------------
// AggSpec.Visitor
// -------------------------------------------------------------------------------------------------------------
@@ -2587,4 +2570,14 @@ private WithSource(@NotNull Collection extends Aggregation> aggregations, @Not
this.source = source;
}
}
+
+ private static class PrepareFormulaResult {
+ public final String[] inputKeyColumns;
+ public final String[] inputNonKeyColumns;
+
+ public PrepareFormulaResult(String[] inputKeyColumns, String[] inputNonKeyColumns) {
+ this.inputKeyColumns = inputKeyColumns;
+ this.inputNonKeyColumns = inputNonKeyColumns;
+ }
+ }
}
From d1f0315f2f1c88621aa8613387865aeca26a947f Mon Sep 17 00:00:00 2001
From: "Charles P. Wright"
Date: Thu, 22 Jan 2026 15:51:47 -0500
Subject: [PATCH 31/31] larry comments
---
.../engine/table/impl/by/AggregationProcessor.java | 8 ++++++--
.../engine/table/impl/by/GroupByReaggregateOperator.java | 2 +-
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
index 61130acf3ba..487c55cb522 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java
@@ -103,7 +103,6 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.jspecify.annotations.NonNull;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -844,6 +843,11 @@ GroupByChunkedOperator ensureGroupByOperator(final QueryTable table,
return newOperator;
}
+ /**
+ * Make sure the group by reaggregate operator at existingOperatorIndex has the needed outputs.
+ *
+ * @return the existing operator; or a new one that has suitable outputs.
+ */
GroupByReaggregateOperator ensureGroupByReaggregateOperator(final QueryTable table,
final int existingOperatorIndex,
final String exposeRowSetAs,
@@ -897,7 +901,7 @@ GroupByReaggregateOperator ensureGroupByReaggregateOperator(final QueryTable tab
return newOperator;
}
- @NonNull
+ @NotNull
PrepareFormulaResult prepareFormula(SelectColumn selectColumn, final TableDefinition tableDefinition,
final Map> extraColumns) {
// Get or create a column definition map composed of vectors of the original column types (or scalars when
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
index 8839acee713..c993244808f 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java
@@ -95,7 +95,7 @@ public GroupByReaggregateOperator(
if (!(source instanceof AggregateColumnSource)) {
throw new IllegalStateException("Expect to reaggregate AggregateColumnSources for a group operation.");
}
- @SuppressWarnings("rawtypes")
+ // noinspection rawtypes
final ColumnSource> realSource = ((AggregateColumnSource) source).getAggregatedSource();
final AggregateColumnSource, ?> aggregateColumnSource = AggregateColumnSource.make(realSource, rowSets);
if (hiddenResults == null || !hiddenResults.contains(pair.output().name())) {