Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/primary-key-table/merge-engine/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Current supported aggregate functions and data types are:
The listagg function concatenates multiple string values into a single string.
It supports STRING data type.
Each field not part of the primary keys can be given a list agg delimiter, specified by the fields.<field-name>.list-agg-delimiter table property, otherwise it will use "," as default.
You can use `fields.<field-name>.distinct=true` to deduplicate values split by the `fields.<field-name>.list-agg-delimiter`.

### bool_and
The bool_and function evaluates whether all values in a boolean set are true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;

import java.time.DateTimeException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
Expand Down Expand Up @@ -419,4 +421,62 @@ public static BinaryString concat(Iterable<BinaryString> inputs) {
}
return BinaryString.fromBytes(result);
}

public static BinaryString[] splitByWholeSeparatorPreserveAllTokens(
BinaryString str, BinaryString delimiter) {
int sizeInBytes = str.getSizeInBytes();
MemorySegment[] segments = str.getSegments();
int offset = str.getOffset();

if (sizeInBytes == 0) {
return EMPTY_STRING_ARRAY;
}

if (delimiter == null || BinaryString.EMPTY_UTF8.equals(delimiter)) {
// Split on whitespace.
return splitByWholeSeparatorPreserveAllTokens(str, fromString(" "));
}

int sepSize = delimiter.getSizeInBytes();
MemorySegment[] sepSegs = delimiter.getSegments();
int sepOffset = delimiter.getOffset();

final ArrayList<BinaryString> substrings = new ArrayList<>();
int beg = 0;
int end = 0;
while (end < sizeInBytes) {
end =
MemorySegmentUtils.find(
segments,
offset + beg,
sizeInBytes - beg,
sepSegs,
sepOffset,
sepSize)
- offset;

if (end > -1) {
if (end > beg) {

// The following is OK, because String.substring( beg, end ) excludes
// the character at the position 'end'.
substrings.add(BinaryString.fromAddress(segments, offset + beg, end - beg));

// Set the starting point for the next search.
// The following is equivalent to beg = end + (separatorLength - 1) + 1,
// which is the right calculation:
} else {
// We found a consecutive occurrence of the separator.
substrings.add(BinaryString.EMPTY_UTF8);
}
beg = end + sepSize;
} else {
// String.substring( beg ) goes from 'beg' to the end of the String.
substrings.add(BinaryString.fromAddress(segments, offset + beg, sizeInBytes - beg));
end = sizeInBytes;
}
}

return substrings.toArray(new BinaryString[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.BinaryStringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;

/** listagg aggregate a field of a row. */
public class FieldListaggAgg extends FieldAggregator {

Expand All @@ -49,10 +54,45 @@ public Object agg(Object accumulator, Object inputField) {
BinaryString mergeFieldSD = (BinaryString) accumulator;
BinaryString inFieldSD = (BinaryString) inputField;

if (distinct && inFieldSD.getSizeInBytes() > 0 && mergeFieldSD.contains(inFieldSD)) {
if (inFieldSD.getSizeInBytes() <= 0) {
return mergeFieldSD;
}

if (mergeFieldSD.getSizeInBytes() <= 0) {
return inFieldSD;
}

if (distinct) {
BinaryString delimiterBinaryString = BinaryString.fromString(delimiter);
BinaryString[] binaryStrings =
BinaryStringUtils.splitByWholeSeparatorPreserveAllTokens(
inFieldSD, delimiterBinaryString);

List<BinaryString> concatItems =
Arrays.stream(binaryStrings)
.filter(it -> it.getSizeInBytes() > 0 && !mergeFieldSD.contains(it))
.collect(
() -> Lists.newArrayList(mergeFieldSD),
(acc, r) -> {
if (!acc.isEmpty()) {
acc.add(delimiterBinaryString);
}
acc.add(r);
},
(l, r) -> {
if (!l.isEmpty() && !r.isEmpty()) {
l.add(delimiterBinaryString);
}
l.addAll(r);
});

if (concatItems.size() == 1) {
return concatItems.get(0);
}

return BinaryStringUtils.concat(concatItems);
}

return BinaryStringUtils.concat(
mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,153 @@ public void testFieldListAggWithDefaultDelimiterAndDistinct() {
assertEquals("user1,user2,user3", result.toString());
}

@Test
public void testFieldListAggWithCustomDelimiterAndEmptyStrings() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAggFactory()
.create(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of(
"fields.fieldName.distinct",
"true",
"fields.fieldName.list-agg-delimiter",
";")),
"fieldName");

BinaryString result =
Stream.of(BinaryString.fromString(""), BinaryString.fromString(""))
.sequential()
.reduce((l, r) -> (BinaryString) fieldListaggAgg.agg(l, r))
.orElse(null);

assertNotNull(result);
assertEquals("", result.toString());
}

@Test
public void testFieldListAggWithDefaultDelimiterAndDistinctWithMultiUser() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAggFactory()
.create(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of("fields.fieldName.distinct", "true")),
"fieldName");

BinaryString result =
Stream.of(
BinaryString.fromString("user1"),
BinaryString.fromString("user2"),
BinaryString.fromString("user1,user3"))
.sequential()
.reduce((l, r) -> (BinaryString) fieldListaggAgg.agg(l, r))
.orElse(null);

assertNotNull(result);
assertEquals("user1,user2,user3", result.toString());
}

@Test
public void testFieldListAggWithDefaultDelimiterAndDistinctWithEmptyLeftUser() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAggFactory()
.create(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of("fields.fieldName.distinct", "true")),
"fieldName");

BinaryString result =
Stream.of(
BinaryString.fromString(""),
BinaryString.fromString("user2"),
BinaryString.fromString("user1,user3"))
.sequential()
.reduce((l, r) -> (BinaryString) fieldListaggAgg.agg(l, r))
.orElse(null);

assertNotNull(result);
assertEquals("user2,user1,user3", result.toString());
}

@Test
public void testFieldListAggWithCustomDelimiterAndDistinctWithMultiKvString() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAggFactory()
.create(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of(
"fields.fieldName.distinct",
"true",
"fields.fieldName.list-agg-delimiter",
";")),
"fieldName");

BinaryString result =
Stream.of(
BinaryString.fromString("k1=v1;k2=v2"),
BinaryString.fromString("k1=v1;k3=v3"),
BinaryString.fromString(""))
.sequential()
.reduce((l, r) -> (BinaryString) fieldListaggAgg.agg(l, r))
.orElse(null);

assertNotNull(result);
assertEquals("k1=v1;k2=v2;k3=v3", result.toString());
}

@Test
public void testFieldListAggWithCustomDelimiterDistinctMultiKvStringWithWhiteSpace() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAggFactory()
.create(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of(
"fields.fieldName.distinct",
"true",
"fields.fieldName.list-agg-delimiter",
" ")),
"fieldName");

BinaryString result =
Stream.of(
BinaryString.fromString("k1=v1 k2=v2"),
BinaryString.fromString(" k1=v1 k3=v3"),
BinaryString.fromString(" "))
.sequential()
.reduce((l, r) -> (BinaryString) fieldListaggAgg.agg(l, r))
.orElse(null);

assertNotNull(result);
assertEquals("k1=v1 k2=v2 k3=v3", result.toString());
}

@Test
public void testFieldListAggWithDefaultDelimiterAndDistinctWithMultiDuplicatedKvString() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAggFactory()
.create(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of("fields.fieldName.distinct", "true")),
"fieldName");

BinaryString result =
Stream.of(
BinaryString.fromString("k1=v1,k2=v2"),
BinaryString.fromString("k1=v1,k2=v3"),
BinaryString.fromString(""))
.sequential()
.reduce((l, r) -> (BinaryString) fieldListaggAgg.agg(l, r))
.orElse(null);

assertNotNull(result);
assertEquals("k1=v1,k2=v2,k2=v3", result.toString());
}

@Test
public void testFieldListAggWithCustomDelimiter() {
FieldListaggAgg fieldListaggAgg =
Expand Down