Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/src/main/sphinx/connector/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ GROUP BY dt;
(1 rows)
```

### Time travel queries

The connector offers the ability to query historical data. This allows viewing
the state of the table at a previous snapshot.

The historical data of the table can be retrieved by specifying the last valid
commit timestamp:
```sql
SELECT * FROM hudi.default.table_name FOR VERSION AS OF '20251027183851494';
```
Note: Hudi stores its table versions as a yyyyMMddHHmmssSSS representation of
the commit time in the table's time zone, though any string can be used. Hudi
will perform a string comparison between the passed in string and the commit
time.

### Schema and table management

Hudi supports [two types of tables](https://hudi.apache.org/docs/table_types)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.metastore.Column;
Expand All @@ -33,6 +34,7 @@
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.PointerType;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
Expand Down Expand Up @@ -104,17 +106,14 @@ public List<String> listSchemaNames(ConnectorSession session)
@Override
public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion)
{
if (startVersion.isPresent() || endVersion.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
}

if (isHiveSystemSchema(tableName.getSchemaName())) {
return null;
}
Optional<Table> table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
if (table.isEmpty()) {
return null;
}
Optional<String> readVersion = getReadVersion(startVersion, endVersion);
if (!isHudiTable(table.get())) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
}
Expand All @@ -130,7 +129,30 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
COPY_ON_WRITE,
getPartitionKeyColumnHandles(table.get(), typeManager),
TupleDomain.all(),
TupleDomain.all());
TupleDomain.all(),
readVersion);
}

private static Optional<String> getReadVersion(Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion)
{
if (startVersion.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported");
}

if (endVersion.isEmpty()) {
return Optional.empty();
}

ConnectorTableVersion version = endVersion.get();
if (version.getPointerType() == PointerType.TEMPORAL) {
throw new TrinoException(NOT_SUPPORTED, "Cannot read 'TIMESTAMP' of Hudi table, use 'VERSION' instead");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new TrinoException(NOT_SUPPORTED, "Cannot read 'TIMESTAMP' of Hudi table, use 'VERSION' instead");
throw new TrinoException(NOT_SUPPORTED, "This connector does not support reading tables with TIMESTAMP AS OF");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is this limitation(reading with temporal) coming from Hudi, or is it something that could be supported in the future?
If it's the latter, could you file an issue and add a TODO here with a link to it?

}

if (version.getVersion() instanceof Slice slice) {
return Optional.of(slice.toStringUtf8());
}

throw new TrinoException(NOT_SUPPORTED, "Provided read version must be a string");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieTableType;

import java.util.List;
import java.util.Optional;
import java.util.Set;

import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand All @@ -41,6 +42,7 @@ public class HudiTableHandle
private final Set<HiveColumnHandle> constraintColumns;
private final TupleDomain<HiveColumnHandle> partitionPredicates;
private final TupleDomain<HiveColumnHandle> regularPredicates;
private final Optional<String> readVersion;

@JsonCreator
public HudiTableHandle(
Expand All @@ -50,9 +52,10 @@ public HudiTableHandle(
@JsonProperty("tableType") HoodieTableType tableType,
@JsonProperty("partitionColumns") List<HiveColumnHandle> partitionColumns,
@JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> partitionPredicates,
@JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> regularPredicates)
@JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> regularPredicates,
@JsonProperty("readVersion") Optional<String> readVersion)
{
this(schemaName, tableName, basePath, tableType, partitionColumns, ImmutableSet.of(), partitionPredicates, regularPredicates);
this(schemaName, tableName, basePath, tableType, partitionColumns, ImmutableSet.of(), partitionPredicates, regularPredicates, readVersion);
}

public HudiTableHandle(
Expand All @@ -63,7 +66,8 @@ public HudiTableHandle(
List<HiveColumnHandle> partitionColumns,
Set<HiveColumnHandle> constraintColumns,
TupleDomain<HiveColumnHandle> partitionPredicates,
TupleDomain<HiveColumnHandle> regularPredicates)
TupleDomain<HiveColumnHandle> regularPredicates,
Optional<String> readVersion)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -73,6 +77,7 @@ public HudiTableHandle(
this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null");
this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null");
this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null");
this.readVersion = requireNonNull(readVersion, "readVersion is null");
}

@JsonProperty
Expand Down Expand Up @@ -124,6 +129,12 @@ public TupleDomain<HiveColumnHandle> getRegularPredicates()
return regularPredicates;
}

@JsonProperty
public Optional<String> getReadVersion()
{
return readVersion;
}

public SchemaTableName getSchemaTableName()
{
return schemaTableName(schemaName, tableName);
Expand All @@ -142,7 +153,8 @@ HudiTableHandle applyPredicates(
partitionColumns,
constraintColumns,
partitionPredicates.intersect(partitionTupleDomain),
regularPredicates.intersect(regularTupleDomain));
regularPredicates.intersect(regularTupleDomain),
readVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
import static org.apache.hudi.common.table.view.HoodieTableFileSystemView.fileListingBasedFileSystemView;

public class HudiReadOptimizedDirectoryLister
Expand All @@ -51,6 +53,7 @@ public class HudiReadOptimizedDirectoryLister
private final HoodieTableFileSystemView fileSystemView;
private final List<Column> partitionColumns;
private final Map<String, HudiPartitionInfo> allPartitionInfoMap;
private final HudiTableHandle tableHandle;

public HudiReadOptimizedDirectoryLister(
HudiTableHandle tableHandle,
Expand All @@ -76,13 +79,17 @@ public HudiReadOptimizedDirectoryLister(
tableHandle.getPartitionPredicates(),
hiveTable,
hiveMetastore)));
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
}

@Override
public List<HudiFileStatus> listStatus(HudiPartitionInfo partitionInfo)
{
LOG.debug("List partition: partitionInfo=%s", partitionInfo);
return fileSystemView.getLatestBaseFiles(partitionInfo.getRelativePartitionPath())
Stream<HoodieBaseFile> baseFileStream = tableHandle.getReadVersion().isEmpty()
? fileSystemView.getLatestBaseFiles(partitionInfo.getRelativePartitionPath())
: fileSystemView.getLatestBaseFilesBeforeOrOn(partitionInfo.getRelativePartitionPath(), tableHandle.getReadVersion().get());
return baseFileStream
.map(HudiReadOptimizedDirectoryLister::getStoragePathInfo)
.map(fileEntry -> new HudiFileStatus(
Location.of(fileEntry.getPath().toString()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static void main(String[] args)
hiveMinioDataLake.start();
QueryRunner queryRunner = builder(hiveMinioDataLake)
.addCoordinatorProperty("http-server.http.port", "8080")
.setDataLoader(new TpchHudiTablesInitializer(TpchTable.getTables()))
.setDataLoader(new TpchHudiTablesInitializer(TpchTable.getTables(), "0"))
.build();

log.info("======== SERVER STARTED ========");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
public class TestHudiConnectorTest
extends BaseConnectorTest
{
private static final long COMMIT_TIMESTAMP = 20251027183851494L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the naming aligned with the Hudi glossary? using "commit timestamp" sounds more like a temporal read, it actually more like a "commit version"


@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return HudiQueryRunner.builder()
.addConnectorProperty("hudi.columns-to-hide", COLUMNS_TO_HIDE)
.setDataLoader(new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES))
.setDataLoader(new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES, String.valueOf(COMMIT_TIMESTAMP)))
.build();
}

Expand Down Expand Up @@ -88,4 +90,24 @@ public void testHideHiveSysSchema()
assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).doesNotContain("sys");
assertQueryFails("SHOW TABLES IN hudi.sys", ".*Schema 'sys' does not exist");
}

@Override
protected void verifyVersionedQueryFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageMatching("Read table with start version is not supported|" +
"Cannot read 'TIMESTAMP' of Hudi table, use 'VERSION' instead|" +
"Provided read version must be a string");
}

@Test
public void testSelectTableUsingTargetIdVersion()
{
String expectedValues = "VALUES 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24";
assertThat(query("SELECT CAST(nationkey AS INT) FROM hudi.tests.nation")).matches(expectedValues);
assertThat(query("SELECT CAST(nationkey AS INT) FROM hudi.tests.nation FOR VERSION AS OF '" + COMMIT_TIMESTAMP + "'")).matches(expectedValues);
assertThat(query("SELECT CAST(nationkey AS INT) FROM hudi.tests.nation FOR VERSION AS OF '" + (COMMIT_TIMESTAMP - 1) + "'")).returnsEmptyResult();
assertQueryFails("SELECT CAST(nationkey AS INT) FROM hudi.tests.nation FOR VERSION AS OF 0", "Provided read version must be a string");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a case that is negative number/string?

assertQueryFails("SELECT CAST(nationkey AS INT) FROM hudi.tests.nation FOR TIMESTAMP AS OF TIMESTAMP '2025-10-29 15:00:00'", "Cannot read 'TIMESTAMP' of Hudi table, use 'VERSION' instead");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected QueryRunner createQueryRunner()

return HudiQueryRunner.builder(hiveMinioDataLake)
.addConnectorProperty("hudi.columns-to-hide", COLUMNS_TO_HIDE)
.setDataLoader(new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES))
.setDataLoader(new TpchHudiTablesInitializer(REQUIRED_TPCH_TABLES, "0"))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected QueryRunner createQueryRunner()

queryRunner.execute("CREATE SCHEMA hive.default");

TpchHudiTablesInitializer tpchHudiTablesInitializer = new TpchHudiTablesInitializer(List.of(NATION));
TpchHudiTablesInitializer tpchHudiTablesInitializer = new TpchHudiTablesInitializer(List.of(NATION), "0");
tpchHudiTablesInitializer.initializeTables(queryRunner, Location.of(dataDirectory.toString()), "default");

copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, hiveSession, ImmutableList.of(TpchTable.REGION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ public class TpchHudiTablesInitializer
private static final HdfsContext CONTEXT = new HdfsContext(SESSION);

private final List<TpchTable<?>> tpchTables;
private final String commitTimestamp;

public TpchHudiTablesInitializer(List<TpchTable<?>> tpchTables)
public TpchHudiTablesInitializer(List<TpchTable<?>> tpchTables, String commitTimestamp)
{
this.tpchTables = requireNonNull(tpchTables, "tpchTables is null");
this.commitTimestamp = requireNonNull(commitTimestamp, "commitTimestamp is null");
}

@Override
Expand Down Expand Up @@ -166,9 +168,8 @@ public void load(TpchTable<?> tpchTables, QueryRunner queryRunner, java.nio.file
.map(MaterializedRow::getFields)
.map(recordConverter::toRecord)
.collect(Collectors.toList());
String timestamp = "0";
writeClient.startCommitWithTime(timestamp);
writeClient.insert(records, timestamp);
writeClient.startCommitWithTime(commitTimestamp);
writeClient.insert(records, commitTimestamp);
}
}

Expand Down