Skip to content

Commit cab7ef6

Browse files
committed
Sorting keys for merge sort to work
Signed-off-by: expani <anijainc@amazon.com>
1 parent f475eca commit cab7ef6

3 files changed

Lines changed: 33 additions & 29 deletions

File tree

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ public List<InternalAggregation> convert(Map<String, Object[]> shardResult, Sear
771771
));
772772
row++;
773773
}
774+
buckets.sort(InternalComposite.InternalBucket::compareKey);
774775
CompositeKey lastBucket = buckets.isEmpty() ? null : buckets.getLast().getRawKey();
775776
return List.of(
776777
new InternalComposite(

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151

5252
import java.io.IOException;
5353
import java.util.ArrayList;
54+
import java.util.Arrays;
55+
import java.util.Collections;
5456
import java.util.List;
5557
import java.util.Map;
5658

@@ -116,34 +118,38 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs
116118
}
117119

118120
@Override
119-
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
120-
String termKey = (String) searchContext.convertToComparable(shardResult.get(name)[row]);
121-
long docCount = 1;
122-
123-
List<InternalAggregation> subAggs = new ArrayList<>();
124-
for (Aggregator aggregator : subAggregators) {
125-
if (aggregator instanceof ShardResultConvertor convertor) {
126-
InternalAggregation subAgg = convertor.convertRow(shardResult, row, searchContext);
127-
if (aggregator instanceof ValueCountAggregator) {
128-
docCount = ((InternalValueCount) subAgg).getValue();
121+
public List<InternalAggregation> convert(Map<String, Object[]> shardResult, SearchContext searchContext) {
122+
int rowCount = shardResult.get(shardResult.keySet().stream().findFirst().get()).length;
123+
List<StringTerms.Bucket> buckets = new ArrayList<>(rowCount);
124+
for (int row = 0; row < rowCount; row++) {
125+
String termKey = (String) searchContext.convertToComparable(shardResult.get(name)[row]);
126+
List<InternalAggregation> subAggs = new ArrayList<>();
127+
long docCount = 1;
128+
for (Aggregator aggregator : subAggregators) {
129+
if (aggregator instanceof ShardResultConvertor convertor) {
130+
InternalAggregation subAgg = convertor.convertRow(shardResult, row, searchContext);
131+
if (aggregator instanceof ValueCountAggregator) {
132+
docCount = ((InternalValueCount) subAgg).getValue();
133+
}
134+
subAggs.add(subAgg);
129135
}
130-
subAggs.add(subAgg);
131136
}
132-
}
133137

138+
buckets.add(new StringTerms.Bucket(
139+
new BytesRef(termKey),
140+
docCount,
141+
InternalAggregations.from(subAggs),
142+
showTermDocCountError,
143+
0,
144+
format
145+
));
146+
}
134147
BucketOrder reduceOrder = order;
135148
if (isKeyOrder(order) == false) {
136149
reduceOrder = InternalOrder.key(true);
150+
buckets.sort(reduceOrder.comparator());
137151
}
138-
StringTerms.Bucket bucket = new StringTerms.Bucket(
139-
new BytesRef(termKey),
140-
docCount,
141-
InternalAggregations.from(subAggs),
142-
showTermDocCountError,
143-
0,
144-
format
145-
);
146-
return new StringTerms(
152+
return List.of(new StringTerms(
147153
name,
148154
reduceOrder,
149155
order,
@@ -152,10 +158,10 @@ public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row
152158
bucketCountThresholds.getShardSize(),
153159
showTermDocCountError,
154160
0,
155-
List.of(bucket),
161+
buckets,
156162
0,
157163
bucketCountThresholds
158-
);
164+
));
159165
}
160166

161167
}

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ static InternalValuesSource doubleValueSource(ValuesSource.Numeric valuesSource,
709709
@Override
710710
public List<InternalAggregation> convert(Map<String, Object[]> shardResult, SearchContext searchContext) {
711711
int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(fields.getFirst()).length ;
712-
List<InternalMultiTerms.Bucket> buckets = new ArrayList<>();
712+
List<InternalMultiTerms.Bucket> buckets = new ArrayList<>(rowCount);
713713
for (int i = 0; i < rowCount; i++) {
714714
final int j = i;
715715
List<Object> key = fields.stream().map(fieldName -> (Object) searchContext.convertToComparable(shardResult.get(fieldName)[j])).toList();
@@ -725,13 +725,10 @@ public List<InternalAggregation> convert(Map<String, Object[]> shardResult, Sear
725725
}
726726
buckets.add(new InternalMultiTerms.Bucket(key, docCount, InternalAggregations.from(subAggs), showTermDocCountError, 0, formats));
727727
}
728-
// TODO : Not reducing using Priority Queue into top buckets as depending on Substrait plan.
729-
BucketOrder reduceOrder;
728+
BucketOrder reduceOrder = order;
730729
if (isKeyOrder(order) == false) {
731730
reduceOrder = InternalOrder.key(true);
732-
//buckets.sort(reduceOrder.comparator());
733-
} else {
734-
reduceOrder = order;
731+
buckets.sort(reduceOrder.comparator());
735732
}
736733
return Collections.singletonList(new InternalMultiTerms(
737734
name,

0 commit comments

Comments
 (0)