diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/protobuf/ProtobufDeserializerFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/protobuf/ProtobufDeserializerFactory.java index 14b1d18e7792..3381fa0dcc8e 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/protobuf/ProtobufDeserializerFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/protobuf/ProtobufDeserializerFactory.java @@ -53,6 +53,7 @@ public class ProtobufDeserializerFactory implements LineDeserializerFactory { + public static final String SERIALIZATION_CLASS = "serialization.class"; private LoadingCache cache; public ProtobufDeserializerFactory(Path descriptorsDirectory, Duration updateInterval, long maximumSize) @@ -80,7 +81,7 @@ public ProtobufDeserializer create(List columns, Map ser throw new TrinoException(CONFIGURATION_INVALID, "No \"hive.protobufs.descriptors\" set in hive configuration"); } - String serializationClass = serdeProperties.get("serialization.class"); + String serializationClass = serdeProperties.get(SERIALIZATION_CLASS); if (serializationClass == null) { throw new TrinoException(HIVE_INVALID_METADATA, "Missing serdeproperties key \"serialization.class\""); } @@ -88,7 +89,12 @@ else if (!serializationClass.matches("^[^$]+\\$[^$]+$")) { throw new TrinoException(HIVE_INVALID_METADATA, String.format("Expected serialization.class to contain {package}${protoname}, but was %s", serializationClass)); } - return new ProtobufDeserializer(columns, cache.getUnchecked(serializationClass)); + return new ProtobufDeserializer(columns, getDescriptor(serializationClass)); + } + + public Descriptor getDescriptor(String serializationClass) + { + return cache.getUnchecked(serializationClass); } private static class DescriptorCacheLoader diff --git a/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastoreWrapper.java b/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastoreWrapper.java new file mode 100644 index 000000000000..721801d2b6d1 --- /dev/null +++ b/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastoreWrapper.java @@ -0,0 +1,387 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.metastore; + +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.function.LanguageFunction; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.RoleGrant; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class HiveMetastoreWrapper + implements HiveMetastore +{ + protected final HiveMetastore delegate; + + public HiveMetastoreWrapper(HiveMetastore delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void abortTransaction(long transactionId) + { + delegate.abortTransaction(transactionId); + } + + @Override + public void acquireSharedReadLock(AcidTransactionOwner transactionOwner, String queryId, long transactionId, List fullTables, List partitions) + { + delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions); + } + + @Override + public void acquireTableWriteLock(AcidTransactionOwner transactionOwner, String queryId, long transactionId, String dbName, String tableName, AcidOperation operation, boolean isDynamicPartitionWrite) + { + delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite); + } + + @Override + public void addColumn(String databaseName, String tableName, String columnName, HiveType columnType, String columnComment) + { + delegate.addColumn(databaseName, tableName, columnName, columnType, columnComment); + } + + @Override + public void addDynamicPartitions(String dbName, String tableName, List partitionNames, long transactionId, long writeId, AcidOperation operation) + { + delegate.addDynamicPartitions(dbName, tableName, partitionNames, transactionId, writeId, operation); + } + + @Override + public void addPartitions(String databaseName, String tableName, List partitions) + { + delegate.addPartitions(databaseName, tableName, partitions); + } + + @Override + public long allocateWriteId(String dbName, String tableName, long transactionId) + { + return delegate.allocateWriteId(dbName, tableName, transactionId); + } + + @Override + public void alterPartition(String databaseName, String tableName, PartitionWithStatistics partition) + { + delegate.alterPartition(databaseName, tableName, partition); + } + + @Override + public void alterTransactionalTable(Table table, long transactionId, long writeId, PrincipalPrivileges principalPrivileges) + { + delegate.alterTransactionalTable(table, transactionId, writeId, principalPrivileges); + } + + @Override + public void checkSupportsTransactions() + { + delegate.checkSupportsTransactions(); + } + + @Override + public void commentColumn(String databaseName, String tableName, String columnName, Optional comment) + { + delegate.commentColumn(databaseName, tableName, columnName, comment); + } + + @Override + public void commentTable(String databaseName, String tableName, Optional comment) + { + delegate.commentTable(databaseName, tableName, comment); + } + + @Override + public void commitTransaction(long transactionId) + { + delegate.commitTransaction(transactionId); + } + + @Override + public void createDatabase(Database database) + { + delegate.createDatabase(database); + } + + @Override + public void createFunction(String databaseName, String functionName, LanguageFunction function) + { + delegate.createFunction(databaseName, functionName, function); + } + + @Override + public void createRole(String role, String grantor) + { + delegate.createRole(role, grantor); + } + + @Override + public void createTable(Table table, PrincipalPrivileges principalPrivileges) + { + delegate.createTable(table, principalPrivileges); + } + + @Override + public void dropColumn(String databaseName, String tableName, String columnName) + { + delegate.dropColumn(databaseName, tableName, columnName); + } + + @Override + public void dropDatabase(String databaseName, boolean deleteData) + { + delegate.dropDatabase(databaseName, deleteData); + } + + @Override + public void dropFunction(String databaseName, String functionName, String signatureToken) + { + delegate.dropFunction(databaseName, functionName, signatureToken); + } + + @Override + public void dropPartition(String databaseName, String tableName, List parts, boolean deleteData) + { + delegate.dropPartition(databaseName, tableName, parts, deleteData); + } + + @Override + public void dropRole(String role) + { + delegate.dropRole(role); + } + + @Override + public void dropTable(String databaseName, String tableName, boolean deleteData) + { + delegate.dropTable(databaseName, tableName, deleteData); + } + + @Override + public boolean functionExists(String databaseName, String functionName, String signatureToken) + { + return delegate.functionExists(databaseName, functionName, signatureToken); + } + + @Override + public List getAllDatabases() + { + return delegate.getAllDatabases(); + } + + @Override + public Collection getAllFunctions(String databaseName) + { + return delegate.getAllFunctions(databaseName); + } + + @Override + public Optional getConfigValue(String name) + { + return delegate.getConfigValue(name); + } + + @Override + public Optional getDatabase(String databaseName) + { + return delegate.getDatabase(databaseName); + } + + @Override + public Collection getFunctions(String databaseName, String functionName) + { + return delegate.getFunctions(databaseName, functionName); + } + + @Override + public Optional getPartition(Table table, List partitionValues) + { + return delegate.getPartition(table, partitionValues); + } + + @Override + public Map> getPartitionColumnStatistics(String databaseName, String tableName, Set partitionNames, Set columnNames) + { + return delegate.getPartitionColumnStatistics(databaseName, tableName, partitionNames, columnNames); + } + + @Override + public Optional> getPartitionNamesByFilter(String databaseName, String tableName, List columnNames, TupleDomain partitionKeysFilter) + { + return delegate.getPartitionNamesByFilter(databaseName, tableName, columnNames, partitionKeysFilter); + } + + @Override + public Map> getPartitionsByNames(Table table, List partitionNames) + { + return delegate.getPartitionsByNames(table, partitionNames); + } + + @Override + public Optional getTable(String databaseName, String tableName) + { + return delegate.getTable(databaseName, tableName); + } + + @Override + public Map getTableColumnStatistics(String databaseName, String tableName, Set columnNames) + { + return delegate.getTableColumnStatistics(databaseName, tableName, columnNames); + } + + @Override + public List getTableNamesWithParameters(String databaseName, String parameterKey, Set parameterValues) + { + return delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues); + } + + @Override + public List getTables(String databaseName) + { + return delegate.getTables(databaseName); + } + + @Override + public String getValidWriteIds(List tables, long currentTransactionId) + { + return delegate.getValidWriteIds(tables, currentTransactionId); + } + + @Override + public void grantRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) + { + delegate.grantRoles(roles, grantees, adminOption, grantor); + } + + @Override + public void grantTablePrivileges(String databaseName, String tableName, String tableOwner, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) + { + delegate.grantTablePrivileges(databaseName, tableName, tableOwner, grantee, grantor, privileges, grantOption); + } + + @Override + public Set listRoleGrants(HivePrincipal principal) + { + return delegate.listRoleGrants(principal); + } + + @Override + public Set listRoles() + { + return delegate.listRoles(); + } + + @Override + public Set listTablePrivileges(String databaseName, String tableName, Optional tableOwner, Optional principal) + { + return delegate.listTablePrivileges(databaseName, tableName, tableOwner, principal); + } + + @Override + public long openTransaction(AcidTransactionOwner transactionOwner) + { + return delegate.openTransaction(transactionOwner); + } + + @Override + public void renameColumn(String databaseName, String tableName, String oldColumnName, String newColumnName) + { + delegate.renameColumn(databaseName, tableName, oldColumnName, newColumnName); + } + + @Override + public void renameDatabase(String databaseName, String newDatabaseName) + { + delegate.renameDatabase(databaseName, newDatabaseName); + } + + @Override + public void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) + { + delegate.renameTable(databaseName, tableName, newDatabaseName, newTableName); + } + + @Override + public void replaceFunction(String databaseName, String functionName, LanguageFunction function) + { + delegate.replaceFunction(databaseName, functionName, function); + } + + @Override + public void replaceTable(String databaseName, String tableName, Table newTable, PrincipalPrivileges principalPrivileges, Map environmentContext) + { + delegate.replaceTable(databaseName, tableName, newTable, principalPrivileges, environmentContext); + } + + @Override + public void revokeRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) + { + delegate.revokeRoles(roles, grantees, adminOption, grantor); + } + + @Override + public void revokeTablePrivileges(String databaseName, String tableName, String tableOwner, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) + { + delegate.revokeTablePrivileges(databaseName, tableName, tableOwner, grantee, grantor, privileges, grantOption); + } + + @Override + public void sendTransactionHeartbeat(long transactionId) + { + delegate.sendTransactionHeartbeat(transactionId); + } + + @Override + public void setDatabaseOwner(String databaseName, HivePrincipal principal) + { + delegate.setDatabaseOwner(databaseName, principal); + } + + @Override + public void setTableOwner(String databaseName, String tableName, HivePrincipal principal) + { + delegate.setTableOwner(databaseName, tableName, principal); + } + + @Override + public void updatePartitionStatistics(Table table, StatisticsUpdateMode mode, Map partitionUpdates) + { + delegate.updatePartitionStatistics(table, mode, partitionUpdates); + } + + @Override + public void updateTableStatistics(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) + { + delegate.updateTableStatistics(databaseName, tableName, acidWriteId, mode, statisticsUpdate); + } + + @Override + public void updateTableWriteId(String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange) + { + delegate.updateTableWriteId(dbName, tableName, transactionId, writeId, rowCountChange); + } + + @Override + public boolean useSparkTableStatistics() + { + return delegate.useSparkTableStatistics(); + } +} diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index 5bbe67e32d07..5ccfd7bb8930 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -41,6 +41,11 @@ classes + + com.google.protobuf + protobuf-java + + dev.failsafe failsafe @@ -559,6 +564,31 @@ + + + + com.github.os72 + protoc-jar-maven-plugin + 3.11.4 + + com.google.protobuf:protoc:${dep.protobuf.version} + ${dep.protobuf.version} + none + + src/test/resources/protobuf/sources + + target/generated-test-sources/ + + + descriptor + none + target/test-classes/protobuf/descriptors + + + + + + org.apache.maven.plugins @@ -603,6 +633,37 @@ + + com.github.os72 + protoc-jar-maven-plugin + + + generate-test-sources + + run + + generate-test-sources + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-test-sources + + add-test-source + + generate-test-sources + + + ${basedir}/target/generated-test-sources + + + + + diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 31c45bc365a3..81b8581fa8b1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -19,11 +19,14 @@ import io.airlift.json.JsonCodec; import io.airlift.units.Duration; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.hive.formats.line.protobuf.ProtobufDeserializerFactory; import io.trino.metastore.HiveMetastore; import io.trino.metastore.HiveMetastoreFactory; import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; +import io.trino.plugin.hive.metastore.dynamic.DynamicSchemaHiveMetastore; +import io.trino.plugin.hive.metastore.dynamic.ProtobufDeserializerFactoryLoader; import io.trino.plugin.hive.security.AccessControlMetadataFactory; import io.trino.plugin.hive.security.UsingSystemSecurity; import io.trino.plugin.hive.statistics.MetastoreHiveStatisticsProvider; @@ -80,6 +83,7 @@ public class HiveMetadataFactory private final boolean allowTableRename; private final HiveTimestampPrecision hiveViewsTimestampPrecision; private final Executor metadataFetchingExecutor; + private final ProtobufDeserializerFactory protobufDeserializerFactory; @Inject public HiveMetadataFactory( @@ -102,7 +106,8 @@ public HiveMetadataFactory( DirectoryLister directoryLister, TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory, @UsingSystemSecurity boolean usingSystemSecurity, - @AllowHiveTableRename boolean allowTableRename) + @AllowHiveTableRename boolean allowTableRename, + ProtobufDeserializerFactoryLoader protobufDeserializerFactoryLoader) { this( catalogName, @@ -139,7 +144,8 @@ public HiveMetadataFactory( hiveConfig.isPartitionProjectionEnabled(), allowTableRename, hiveConfig.getTimestampPrecision(), - hiveConfig.getMetadataParallelism()); + hiveConfig.getMetadataParallelism(), + protobufDeserializerFactoryLoader.get()); } public HiveMetadataFactory( @@ -177,7 +183,8 @@ public HiveMetadataFactory( boolean partitionProjectionEnabled, boolean allowTableRename, HiveTimestampPrecision hiveViewsTimestampPrecision, - int metadataParallelism) + int metadataParallelism, + ProtobufDeserializerFactory protobufDeserializerFactory) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.skipDeletionForAlter = skipDeletionForAlter; @@ -202,6 +209,7 @@ public HiveMetadataFactory( this.systemTableProviders = requireNonNull(systemTableProviders, "systemTableProviders is null"); this.accessControlMetadataFactory = requireNonNull(accessControlMetadataFactory, "accessControlMetadataFactory is null"); this.hiveTransactionHeartbeatInterval = requireNonNull(hiveTransactionHeartbeatInterval, "hiveTransactionHeartbeatInterval is null"); + this.protobufDeserializerFactory = requireNonNull(protobufDeserializerFactory, "protobufDeserializerFactory is null"); fileSystemExecutor = new BoundedExecutor(executorService, maxConcurrentFileSystemOperations); dropExecutor = new BoundedExecutor(executorService, maxConcurrentMetastoreDrops); @@ -231,7 +239,10 @@ public HiveMetadataFactory( @Override public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit) { - HiveMetastore hiveMetastore = createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize); + HiveMetastore hiveMetastore = new DynamicSchemaHiveMetastore( + createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize), + protobufDeserializerFactory, + Duration.valueOf("1h")); // TODO make configurable DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister); SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore( diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 30c454f9517b..84a176249915 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -41,6 +41,7 @@ import io.trino.plugin.hive.line.SimpleTextFilePageSourceFactory; import io.trino.plugin.hive.line.SimpleTextFileWriterFactory; import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.dynamic.ProtobufDeserializerFactoryLoader; import io.trino.plugin.hive.orc.OrcFileWriterFactory; import io.trino.plugin.hive.orc.OrcPageSourceFactory; import io.trino.plugin.hive.orc.OrcReaderConfig; @@ -97,6 +98,7 @@ public void setup(Binder binder) binder.bind(ConnectorPageSourceProvider.class).to(HivePageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); + binder.bind(ProtobufDeserializerFactoryLoader.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/ProtobufSequenceFilePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/ProtobufSequenceFilePageSourceFactory.java index 7766aafa7c9b..0de3424ea5cf 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/ProtobufSequenceFilePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/ProtobufSequenceFilePageSourceFactory.java @@ -15,9 +15,9 @@ import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.hive.formats.line.protobuf.ProtobufDeserializerFactory; import io.trino.hive.formats.line.sequence.SequenceFileReaderFactory; import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.metastore.dynamic.ProtobufDeserializerFactoryLoader; import static java.lang.Math.toIntExact; @@ -25,10 +25,10 @@ public class ProtobufSequenceFilePageSourceFactory extends LinePageSourceFactory { @Inject - public ProtobufSequenceFilePageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) + public ProtobufSequenceFilePageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, ProtobufDeserializerFactoryLoader loader, HiveConfig config) { super(trinoFileSystemFactory, - new ProtobufDeserializerFactory(config.getProtobufDescriptorsLocation(), config.getProtobufDescriptorsCacheRefreshInterval(), config.getProtobufDescriptorsCacheMaxSize()), + loader.get(), new SequenceFileReaderFactory(1024, toIntExact(config.getTextMaxLineLength().toBytes()))); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaHiveMetastore.java new file mode 100644 index 000000000000..c1f4994e485c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaHiveMetastore.java @@ -0,0 +1,143 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.dynamic; + +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.Descriptors; +import io.airlift.units.Duration; +import io.trino.cache.EvictableCacheBuilder; +import io.trino.hive.formats.line.protobuf.ProtobufDeserializerFactory; +import io.trino.metastore.Column; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.HiveMetastoreWrapper; +import io.trino.metastore.Partition; +import io.trino.metastore.Table; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static io.trino.hive.formats.line.protobuf.ProtobufDeserializerFactory.SERIALIZATION_CLASS; +import static io.trino.plugin.hive.HiveStorageFormat.SEQUENCEFILE_PROTOBUF; +import static java.util.stream.Collectors.toMap; + +public class DynamicSchemaHiveMetastore + extends HiveMetastoreWrapper +{ + private final LoadingCache> dynamicSchemaCache; + + public DynamicSchemaHiveMetastore(HiveMetastore delegate, ProtobufDeserializerFactory protobufDeserializerFactory, Duration dynamicSchemaCacheExpiration) + { + super(delegate); + dynamicSchemaCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(dynamicSchemaCacheExpiration.toJavaTime()) + .build(new CacheLoader<>() { + @Override + public List load(TableReference tableName) + { + Descriptors.Descriptor descriptor = protobufDeserializerFactory.getDescriptor(tableName.getSerializationClass()); + return descriptor.getFields().stream().map(DynamicSchemaLoader::fieldToColumn).toList(); + } + }); + } + + private static boolean isTableWithDynamicSchema(Table table) + { + return table.getStorage() != null && table.getStorage().getStorageFormat() != null && SEQUENCEFILE_PROTOBUF.getSerde().equals(table.getStorage().getStorageFormat().getSerDeNullable()); + } + + private List getDynamicColumns(Table table) + { + return dynamicSchemaCache.getUnchecked(new TableReference(table)); + } + + private Partition getDynamicPartition(Table table, Partition partition) + { + return Partition.builder(partition) + .setColumns(getDynamicColumns(table)) + .build(); + } + + @Override + public Optional
getTable(String databaseName, String tableName) + { + return delegate.getTable(databaseName, tableName).map(table -> { + if (isTableWithDynamicSchema(table)) { + return Table.builder(table) + .setDataColumns(getDynamicColumns(table)) + .build(); + } + return table; + }); + } + + @Override + public Optional getPartition(Table table, List partitionValues) + { + return delegate.getPartition(table, partitionValues).map(partition -> { + if (isTableWithDynamicSchema(table)) { + return getDynamicPartition(table, partition); + } + return partition; + }); + } + + @Override + public Map> getPartitionsByNames(Table table, List partitionNames) + { + Map> partitionsByNames = delegate.getPartitionsByNames(table, partitionNames); + if (isTableWithDynamicSchema(table)) { + return partitionsByNames.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue() + .map(partition -> getDynamicPartition(table, partition)))); + } + return partitionsByNames; + } + + // Class to use as cache key, using the storage and table name + private record TableReference(Table table) + { + private String getFullTableName() + { + return String.format("%s.%s", table.getDatabaseName(), table.getTableName()); + } + + private String getSerializationClass() + { + String serializationClass = table.getStorage().getSerdeParameters().get(SERIALIZATION_CLASS); + if (serializationClass != null) { + return serializationClass; + } + throw new IllegalStateException(SERIALIZATION_CLASS + " missing in table " + getFullTableName()); + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) { + return false; + } + TableReference tableName = (TableReference) o; + return Objects.equals(getFullTableName(), tableName.getFullTableName()); + } + + @Override + public int hashCode() + { + return Objects.hashCode(getFullTableName()); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaLoader.java new file mode 100644 index 000000000000..bf34801aa5ed --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/DynamicSchemaLoader.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.dynamic; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Descriptors.FieldDescriptor; +import io.airlift.log.Logger; +import io.trino.metastore.Column; +import io.trino.metastore.HiveType; +import io.trino.metastore.type.TypeInfo; +import io.trino.metastore.type.TypeInfoFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.Locale.ENGLISH; + +public final class DynamicSchemaLoader +{ + private static final Logger LOG = Logger.get(DynamicSchemaLoader.class); + + private DynamicSchemaLoader() + { + } + + public static Column fieldToColumn(FieldDescriptor fieldDescriptor) + { + String name = fieldDescriptor.getName().toLowerCase(ENGLISH); + return new Column(name, HiveType.fromTypeInfo(getType(fieldDescriptor, ImmutableList.of(fieldDescriptor.getFullName()))), Optional.of(fieldDescriptor.getFullName()), Map.of()); + } + + private static TypeInfo getType(FieldDescriptor fieldDescriptor, List path) + { + TypeInfo baseType = switch (fieldDescriptor.getJavaType()) { + case BOOLEAN -> HiveType.HIVE_BOOLEAN.getTypeInfo(); + case INT -> HiveType.HIVE_INT.getTypeInfo(); + case LONG -> HiveType.HIVE_LONG.getTypeInfo(); + case FLOAT -> HiveType.HIVE_FLOAT.getTypeInfo(); + case DOUBLE -> HiveType.HIVE_DOUBLE.getTypeInfo(); + case BYTE_STRING -> HiveType.HIVE_BINARY.getTypeInfo(); + case STRING, ENUM -> HiveType.HIVE_STRING.getTypeInfo(); + case MESSAGE -> { + List names = new ArrayList<>(); + List typeInfos = new ArrayList<>(); + List innerDescriptors = fieldDescriptor.getMessageType().getFields(); + for (FieldDescriptor innerDescriptor : innerDescriptors) { + if (path.contains(innerDescriptor.getFullName())) { + LOG.warn("Descriptor recursion detected; omitting " + innerDescriptor.getFullName()); + continue; + } + List innerPath = new ArrayList<>(path); + innerPath.add(innerDescriptor.getFullName()); + + names.add(innerDescriptor.getName()); + typeInfos.add(getType(innerDescriptor, ImmutableList.copyOf(innerPath))); + } + yield TypeInfoFactory.getStructTypeInfo(names, typeInfos); + } + }; + if (fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) { + return TypeInfoFactory.getListTypeInfo(baseType); + } + return baseType; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/ProtobufDeserializerFactoryLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/ProtobufDeserializerFactoryLoader.java new file mode 100644 index 000000000000..1ba3d57e7b18 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/dynamic/ProtobufDeserializerFactoryLoader.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.dynamic; + +import com.google.inject.Inject; +import io.trino.hive.formats.line.protobuf.ProtobufDeserializerFactory; +import io.trino.plugin.hive.HiveConfig; + +public class ProtobufDeserializerFactoryLoader +{ + private final ProtobufDeserializerFactory factory; + + @Inject + public ProtobufDeserializerFactoryLoader(HiveConfig config) + { + factory = new ProtobufDeserializerFactory(config.getProtobufDescriptorsLocation(), config.getProtobufDescriptorsCacheRefreshInterval(), config.getProtobufDescriptorsCacheMaxSize()); + } + + public ProtobufDeserializerFactory get() + { + return factory; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index 091cd15135e0..69ebd6a3b325 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -823,6 +823,9 @@ public synchronized void addPartitions(String databaseName, String tableName, Li @GuardedBy("this") private void verifiedPartition(Table table, Partition partition) { + if (disableLocationChecks) { + return; + } Location partitionMetadataDirectory = getPartitionMetadataDirectory(table, partition.getValues()); if (table.getTableType().equals(MANAGED_TABLE.name())) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index 3a96d1816d14..7269b060b7cc 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -49,6 +49,7 @@ import io.trino.plugin.hive.line.SimpleSequenceFileWriterFactory; import io.trino.plugin.hive.line.SimpleTextFilePageSourceFactory; import io.trino.plugin.hive.line.SimpleTextFileWriterFactory; +import io.trino.plugin.hive.metastore.dynamic.ProtobufDeserializerFactoryLoader; import io.trino.plugin.hive.orc.OrcFileWriterFactory; import io.trino.plugin.hive.orc.OrcPageSourceFactory; import io.trino.plugin.hive.orc.OrcReaderConfig; @@ -180,7 +181,7 @@ public static Set getDefaultHivePageSourceFactories(Trino .add(new RcFilePageSourceFactory(fileSystemFactory, hiveConfig)) .add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig)) .add(new ParquetPageSourceFactory(fileSystemFactory, stats, Optional.empty(), new ParquetReaderConfig(), hiveConfig)) - .add(new ProtobufSequenceFilePageSourceFactory(fileSystemFactory, hiveConfig)) + .add(new ProtobufSequenceFilePageSourceFactory(fileSystemFactory, new ProtobufDeserializerFactoryLoader(hiveConfig), hiveConfig)) .build(); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/dynamic/TestDynamicSchemaHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/dynamic/TestDynamicSchemaHiveMetastore.java new file mode 100644 index 000000000000..110051a85a74 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/dynamic/TestDynamicSchemaHiveMetastore.java @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.dynamic; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; +import io.trino.filesystem.local.LocalFileSystemFactory; +import io.trino.hive.formats.line.protobuf.ProtobufDeserializerFactory; +import io.trino.metastore.Column; +import io.trino.metastore.Database; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.Partition; +import io.trino.metastore.PartitionStatistics; +import io.trino.metastore.PartitionWithStatistics; +import io.trino.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.spi.NodeVersion; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.metastore.HiveType.HIVE_INT; +import static io.trino.metastore.HiveType.HIVE_STRING; +import static io.trino.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.metastore.type.TypeInfoFactory.getListTypeInfo; +import static io.trino.metastore.type.TypeInfoFactory.getStructTypeInfo; +import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; +import static io.trino.plugin.hive.HiveStorageFormat.SEQUENCEFILE_PROTOBUF; +import static io.trino.plugin.hive.TableType.MANAGED_TABLE; +import static java.nio.file.Files.createTempDirectory; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestDynamicSchemaHiveMetastore +{ + private static String databaseName = "test_database_" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + private static String tableName = "test_table"; + private static Path tempDir; + private static HiveMetastore metastore; + + @BeforeAll + static void beforeAll() + throws Exception + { + tempDir = createTempDirectory("test"); + LocalFileSystemFactory fileSystemFactory = new LocalFileSystemFactory(tempDir); + + HiveMetastore delegate = new FileHiveMetastore( + new NodeVersion("testversion"), + fileSystemFactory, + false, + new FileHiveMetastoreConfig() + .setCatalogDirectory("local:///") + .setMetastoreUser("test") + .setDisableLocationChecks(true)); + + ProtobufDeserializerFactory protobufDeserializerFactory = new ProtobufDeserializerFactory(Path.of(TestDynamicSchemaHiveMetastore.class.getResource("/protobuf/descriptors").toURI()), Duration.valueOf("1h"), 10); + Duration dynamicSchemaCacheExpiration = Duration.valueOf("1h"); + metastore = new DynamicSchemaHiveMetastore(delegate, protobufDeserializerFactory, dynamicSchemaCacheExpiration); + + Database.Builder database = Database.builder() + .setDatabaseName(databaseName) + .setOwnerName(Optional.empty()) + .setOwnerType(Optional.empty()); + metastore.createDatabase(database.build()); + + Table.Builder tableBuilder = Table.builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setParameters(Map.of(TRINO_QUERY_ID_NAME, "query_id")) + .setTableType(MANAGED_TABLE.name()) + .setDataColumns(List.of()) // No columns needed, they are inferred from the protobuf + .setOwner(Optional.empty()) + .setPartitionColumns(ImmutableList.of(new Column("year", HIVE_STRING, Optional.empty(), Map.of()))); + + tableBuilder.getStorageBuilder() + //.setLocation(Optional.of("/tmp/location")) + .setStorageFormat(SEQUENCEFILE_PROTOBUF.toStorageFormat()) + .setSerdeParameters(Map.of("serialization.class", "com.example.tutorial.protos.AddressBookProtos$Person")); + + metastore.createTable(tableBuilder.build(), NO_PRIVILEGES); + Path tablePath = tempDir.resolve(databaseName, tableName); + Path partitionPath = tablePath.resolve("year=2025"); + + Partition partition = Partition.builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setValues(List.of("2025")) + .withStorage(storageBuilder -> storageBuilder + .setStorageFormat(SEQUENCEFILE_PROTOBUF.toStorageFormat()) + .setLocation(partitionPath.toString())) + .setColumns(List.of()) + .build(); + metastore.addPartitions(databaseName, tableName, List.of(new PartitionWithStatistics(partition, "year=2025", PartitionStatistics.empty()))); + } + + @AfterAll + static void tearDown() + throws IOException + { + metastore.dropTable(databaseName, tableName, false); + metastore.dropDatabase(databaseName, false); + deleteRecursively(tempDir, ALLOW_INSECURE); + } + + @Test + void testTableWithDynamicSchema() + { + Table table = metastore.getTable(databaseName, tableName).orElseThrow(NullPointerException::new); + assertColumns(table.getDataColumns()); + } + + @Test + void testDynamicSchemaPartitions() + { + Table table = metastore.getTable(databaseName, tableName).orElseThrow(NullPointerException::new); + Partition partition = metastore.getPartition(table, List.of("2025")).orElseThrow(); + assertColumns(partition.getColumns()); + } + + @Test + void testDynamicSchemaPartitionsByNames() + { + Table table = metastore.getTable(databaseName, tableName).orElseThrow(NullPointerException::new); + Map> partitions = metastore.getPartitionsByNames(table, List.of("year=2025")); + assertThat(partitions).hasSize(1); + assertThat(partitions.get("year=2025")).isNotNull(); + assertThat(partitions.get("year=2025").isPresent()).isTrue(); + assertColumns(partitions.get("year=2025").get().getColumns()); + } + + private static void assertColumns(List columns) + { + assertThat(columns.get(0).getName()).isEqualTo("name"); + assertThat(columns.get(0).getType()).isEqualTo(HIVE_STRING); + + assertThat(columns.get(1).getName()).isEqualTo("id"); + assertThat(columns.get(1).getType()).isEqualTo(HIVE_INT); + + assertThat(columns.get(2).getName()).isEqualTo("email"); + assertThat(columns.get(2).getType()).isEqualTo(HIVE_STRING); + + assertThat(columns.get(3).getName()).isEqualTo("phones"); + assertThat(columns.get(3).getType().getTypeInfo()).isEqualTo(getListTypeInfo( + getStructTypeInfo(List.of("number", "type"), List.of(HIVE_STRING.getTypeInfo(), HIVE_STRING.getTypeInfo())))); + } +} diff --git a/plugin/trino-hive/src/test/resources/protobuf/sources/person.proto b/plugin/trino-hive/src/test/resources/protobuf/sources/person.proto new file mode 100644 index 000000000000..8fbebd8b120c --- /dev/null +++ b/plugin/trino-hive/src/test/resources/protobuf/sources/person.proto @@ -0,0 +1,27 @@ +syntax = "proto2"; + +package tutorial; + +option java_multiple_files = true; +option java_package = "com.example.tutorial.protos"; +option java_outer_classname = "AddressBookProtos"; + +message Person { + optional string name = 1; + optional int32 id = 2; + optional string email = 3; + + enum PhoneType { + PHONE_TYPE_UNSPECIFIED = 0; + PHONE_TYPE_MOBILE = 1; + PHONE_TYPE_HOME = 2; + PHONE_TYPE_WORK = 3; + } + + message PhoneNumber { + optional string number = 1; + optional PhoneType type = 2 [default = PHONE_TYPE_HOME]; + } + + repeated PhoneNumber phones = 4; +}