Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;
Expand All @@ -58,6 +60,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -185,16 +188,28 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
List<PartitionEntry> partitions =
fileStoreTable.newScan().withLevelFilter(level -> true).listPartitionEntries();

@SuppressWarnings("unchecked")
CastExecutor<InternalRow, BinaryString> partitionCastExecutor =
(CastExecutor<InternalRow, BinaryString>)
CastExecutors.resolveToString(
fileStoreTable.schema().logicalPartitionType());
List<DataType> fieldTypes =
fileStoreTable.schema().logicalPartitionType().getFieldTypes();
InternalRow.FieldGetter[] fieldGetters =
InternalRowUtils.createFieldGetters(fieldTypes);
List<CastExecutor> castExecutors =
fieldTypes.stream()
.map(CastExecutors::resolveToString)
.collect(Collectors.toList());

// sorted by partition
Iterator<InternalRow> iterator =
partitions.stream()
.map(partitionEntry -> toRow(partitionEntry, partitionCastExecutor))
.map(
partitionEntry ->
toRow(
partitionEntry,
fileStoreTable.partitionKeys(),
castExecutors,
fieldGetters,
fileStoreTable
.coreOptions()
.partitionDefaultName()))
.sorted(Comparator.comparing(row -> row.getString(0)))
.iterator();

Expand All @@ -211,9 +226,32 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {

private InternalRow toRow(
PartitionEntry entry,
CastExecutor<InternalRow, BinaryString> partitionCastExecutor) {
List<String> partitionKeys,
List<CastExecutor> castExecutors,
InternalRow.FieldGetter[] fieldGetters,
String defaultPartitionName) {
StringBuilder partitionStringBuilder = new StringBuilder();

for (int i = 0; i < partitionKeys.size(); i++) {
if (i > 0) {
partitionStringBuilder.append("/");
}
Object partitionValue = fieldGetters[i].getFieldOrNull(entry.partition());
String partitionValueString =
partitionValue == null
? defaultPartitionName
: castExecutors
.get(i)
.cast(fieldGetters[i].getFieldOrNull(entry.partition()))
.toString();
partitionStringBuilder
.append(partitionKeys.get(i))
.append("=")
.append(partitionValueString);
}

return GenericRow.of(
partitionCastExecutor.cast(entry.partition()),
BinaryString.fromString(partitionStringBuilder.toString()),
entry.recordCount(),
entry.fileSizeInBytes(),
entry.fileCount(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public void before() throws Exception {
@Test
public void testPartitionRecordCount() throws Exception {
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(GenericRow.of(BinaryString.fromString("{1}"), 2L));
expectedRow.add(GenericRow.of(BinaryString.fromString("{2}"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("{3}"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=1"), 2L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=2"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=3"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result = read(partitionsTable, new int[] {0, 1});
Expand All @@ -97,8 +97,8 @@ public void testPartitionRecordCount() throws Exception {
@Test
public void testPartitionTimeTravel() throws Exception {
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(GenericRow.of(BinaryString.fromString("{1}"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("{3}"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=1"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=3"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result =
Expand All @@ -113,9 +113,9 @@ public void testPartitionTimeTravel() throws Exception {
public void testPartitionValue() throws Exception {
write(table, GenericRow.of(2, 1, 3), GenericRow.of(3, 1, 4));
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(GenericRow.of(BinaryString.fromString("{1}"), 4L, 3L));
expectedRow.add(GenericRow.of(BinaryString.fromString("{2}"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("{3}"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=1"), 4L, 3L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=2"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("pt=3"), 1L, 1L));

List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,15 @@ public void testBranchPartitionsTable() throws Exception {
sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S3'), (2, 2, 'S4')");

assertThat(collectResult("SELECT `partition`, record_count, file_count FROM t$partitions"))
.containsExactlyInAnyOrder("+I[{1}, 3, 3]", "+I[{2}, 3, 2]");
.containsExactlyInAnyOrder("+I[a=1, 3, 3]", "+I[a=2, 3, 2]");
assertThat(
collectResult(
"SELECT `partition`, record_count, file_count FROM t$branch_b1$partitions"))
.containsExactlyInAnyOrder("+I[{1}, 2, 2]", "+I[{2}, 3, 2]");
.containsExactlyInAnyOrder("+I[a=1, 2, 2]", "+I[a=2, 3, 2]");
assertThat(
collectResult(
"SELECT `partition`, record_count, file_count FROM t$partitions /*+ OPTIONS('branch'='b1') */"))
.containsExactlyInAnyOrder("+I[{1}, 2, 2]", "+I[{2}, 3, 2]");
.containsExactlyInAnyOrder("+I[a=1, 2, 2]", "+I[a=2, 3, 2]");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ public void testPartitionsTable() {
sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
List<Row> result =
sql("SELECT `partition`, record_count, file_count FROM %s$partitions", table);
assertThat(result).containsExactlyInAnyOrder(Row.of("{1}", 2L, 2L), Row.of("{2}", 3L, 2L));
assertThat(result).containsExactlyInAnyOrder(Row.of("p=1", 2L, 2L), Row.of("p=2", 3L, 2L));

// assert new files in partition
sql("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 'S4')", table);
Expand All @@ -1035,10 +1035,10 @@ public void testPartitionsTable() {
table));
assertThat(result)
.containsExactlyInAnyOrder(
Row.of("{1}", 3L, 3L),
Row.of("{2}", 4L, 3L),
Row.of("{3}", 1L, 1L),
Row.of("{4}", 1L, 1L));
Row.of("p=1", 3L, 3L),
Row.of("p=2", 4L, 3L),
Row.of("p=3", 1L, 1L),
Row.of("p=4", 1L, 1L));

// assert delete partitions
sql("ALTER TABLE %s DROP PARTITION (p = 2)", table);
Expand All @@ -1049,7 +1049,7 @@ public void testPartitionsTable() {
table));
assertThat(result)
.containsExactlyInAnyOrder(
Row.of("{1}", 3L, 3L), Row.of("{3}", 1L, 1L), Row.of("{4}", 1L, 1L));
Row.of("p=1", 3L, 3L), Row.of("p=3", 1L, 1L), Row.of("p=4", 1L, 1L));

// add new file to p 2
sql("INSERT INTO %s VALUES (1, 2, 2, 'S1')", table);
Expand All @@ -1060,10 +1060,10 @@ public void testPartitionsTable() {
table));
assertThat(result)
.containsExactlyInAnyOrder(
Row.of("{1}", 3L, 3L),
Row.of("{2}", 1L, 1L),
Row.of("{3}", 1L, 1L),
Row.of("{4}", 1L, 1L));
Row.of("p=1", 3L, 3L),
Row.of("p=2", 1L, 1L),
Row.of("p=3", 1L, 1L),
Row.of("p=4", 1L, 1L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {
checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(5) :: Nil)
checkAnswer(
spark.sql("select partition from `T$partitions`"),
Row("{2024-10-09, 01}") :: Row("{2024-10-09, 02}") :: Row("{2024-10-10, 01}") :: Row(
"{2024-10-10, 12}") :: Row("{2024-10-10, 23}") :: Nil
Row("dt=2024-10-09/hh=01") :: Row("dt=2024-10-09/hh=02") :: Row("dt=2024-10-10/hh=01") :: Row(
"dt=2024-10-10/hh=12") :: Row("dt=2024-10-10/hh=23") :: Nil
)
}

Expand Down Expand Up @@ -93,7 +93,7 @@ class PaimonSystemTableTest extends PaimonSparkTestBase {

checkAnswer(
sql("SELECT partition FROM `T$partitions`"),
Seq(Row("{2024-10-10, 1}"), Row("{null, 1}")))
Seq(Row("p1=2024-10-10/p2=1"), Row("p1=__DEFAULT_PARTITION__/p2=1")))

checkAnswer(
sql("SELECT partition, bucket FROM `T$buckets`"),
Expand Down
Loading