From a36cd30987846f3c0f6447998556e478f82eae6a Mon Sep 17 00:00:00 2001 From: beliefer Date: Thu, 12 Mar 2026 15:59:18 +0800 Subject: [PATCH] [GLUTEN-11692][CORE] Driver side should avoid submitting subqueries while validating file formats --- .../hive/HiveTableScanExecTransformer.scala | 14 +++++--- .../execution/AbstractHiveTableScanExec.scala | 32 +++++++++++-------- .../execution/AbstractHiveTableScanExec.scala | 32 +++++++++++-------- .../execution/AbstractHiveTableScanExec.scala | 32 +++++++++++-------- .../execution/AbstractHiveTableScanExec.scala | 32 +++++++++++-------- .../execution/AbstractHiveTableScanExec.scala | 32 +++++++++++-------- 6 files changed, 100 insertions(+), 74 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index b971eb46bc2c..0abb1a91f4d7 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -77,7 +77,16 @@ case class HiveTableScanExecTransformer( partitionWithReadFileFormats override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = - distinctReadFileFormats + if ( + relation.isPartitioned && + basePrunedPartitions.exists(_.getInputFormatClass != tableDesc.getInputFileFormatClass) + ) { + basePrunedPartitions.map { + partition => getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage) + }.toSet + } else { + Set(fileFormat) + } override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema @@ -120,9 +129,6 @@ case class HiveTableScanExecTransformer( @transient private lazy val partitions: Seq[Partition] = partitionWithReadFileFormats.unzip._1 - @transient private lazy val distinctReadFileFormats: Set[ReadFileFormat] = - partitionWithReadFileFormats.iterator.map(_._2).toSet - @transient override lazy val fileFormat: ReadFileFormat = getReadFileFormat(relation.tableMeta.storage) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala index ad1857b72129..8b2a54a0649e 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala @@ -173,26 +173,30 @@ abstract private[hive] class AbstractHiveTableScanExec( } } - @transient lazy val prunedPartitions: Seq[HivePartition] = { + // This is used on the driver side, so it is important to avoid executing subqueries + @transient lazy val basePrunedPartitions: Seq[HivePartition] = { + if (relation.prunedPartitions.nonEmpty) { + relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } else { + rawPartitions + } + } + + @transient lazy val prunedPartitions: Seq[HivePartition] = if (relation.prunedPartitions.nonEmpty) { - val hivePartitions = - relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) if (partitionPruningPred.forall(!ExecSubqueryExpression.hasSubquery(_))) { - hivePartitions + basePrunedPartitions } else { - prunePartitions(hivePartitions) + prunePartitions(basePrunedPartitions) } + } else if ( + sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.nonEmpty + ) { + basePrunedPartitions } else { - if ( - sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.nonEmpty - ) { - rawPartitions - } else { - prunePartitions(rawPartitions) - } + prunePartitions(basePrunedPartitions) } - } // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala index 8422d33e521d..5e747e7df345 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala @@ -175,26 +175,30 @@ abstract private[hive] class AbstractHiveTableScanExec( } } - @transient lazy val prunedPartitions: Seq[HivePartition] = { + // This is used on the driver side, so it is important to avoid executing subqueries + @transient lazy val basePrunedPartitions: Seq[HivePartition] = { + if (relation.prunedPartitions.nonEmpty) { + relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } else { + rawPartitions + } + } + + @transient lazy val prunedPartitions: Seq[HivePartition] = if (relation.prunedPartitions.nonEmpty) { - val hivePartitions = - relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) if (partitionPruningPred.forall(!ExecSubqueryExpression.hasSubquery(_))) { - hivePartitions + basePrunedPartitions } else { - prunePartitions(hivePartitions) + prunePartitions(basePrunedPartitions) } + } else if ( + sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.nonEmpty + ) { + basePrunedPartitions } else { - if ( - sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.nonEmpty - ) { - rawPartitions - } else { - prunePartitions(rawPartitions) - } + prunePartitions(basePrunedPartitions) } - } // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala index 8422d33e521d..5e747e7df345 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala @@ -175,26 +175,30 @@ abstract private[hive] class AbstractHiveTableScanExec( } } - @transient lazy val prunedPartitions: Seq[HivePartition] = { + // This is used on the driver side, so it is important to avoid executing subqueries + @transient lazy val basePrunedPartitions: Seq[HivePartition] = { + if (relation.prunedPartitions.nonEmpty) { + relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } else { + rawPartitions + } + } + + @transient lazy val prunedPartitions: Seq[HivePartition] = if (relation.prunedPartitions.nonEmpty) { - val hivePartitions = - relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) if (partitionPruningPred.forall(!ExecSubqueryExpression.hasSubquery(_))) { - hivePartitions + basePrunedPartitions } else { - prunePartitions(hivePartitions) + prunePartitions(basePrunedPartitions) } + } else if ( + sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.nonEmpty + ) { + basePrunedPartitions } else { - if ( - sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.nonEmpty - ) { - rawPartitions - } else { - prunePartitions(rawPartitions) - } + prunePartitions(basePrunedPartitions) } - } // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { diff --git a/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala b/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala index 8422d33e521d..5e747e7df345 100644 --- a/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala +++ b/shims/spark40/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala @@ -175,26 +175,30 @@ abstract private[hive] class AbstractHiveTableScanExec( } } - @transient lazy val prunedPartitions: Seq[HivePartition] = { + // This is used on the driver side, so it is important to avoid executing subqueries + @transient lazy val basePrunedPartitions: Seq[HivePartition] = { + if (relation.prunedPartitions.nonEmpty) { + relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } else { + rawPartitions + } + } + + @transient lazy val prunedPartitions: Seq[HivePartition] = if (relation.prunedPartitions.nonEmpty) { - val hivePartitions = - relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) if (partitionPruningPred.forall(!ExecSubqueryExpression.hasSubquery(_))) { - hivePartitions + basePrunedPartitions } else { - prunePartitions(hivePartitions) + prunePartitions(basePrunedPartitions) } + } else if ( + sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.nonEmpty + ) { + basePrunedPartitions } else { - if ( - sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.nonEmpty - ) { - rawPartitions - } else { - prunePartitions(rawPartitions) - } + prunePartitions(basePrunedPartitions) } - } // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala index 8422d33e521d..5e747e7df345 100644 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala @@ -175,26 +175,30 @@ abstract private[hive] class AbstractHiveTableScanExec( } } - @transient lazy val prunedPartitions: Seq[HivePartition] = { + // This is used on the driver side, so it is important to avoid executing subqueries + @transient lazy val basePrunedPartitions: Seq[HivePartition] = { + if (relation.prunedPartitions.nonEmpty) { + relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } else { + rawPartitions + } + } + + @transient lazy val prunedPartitions: Seq[HivePartition] = if (relation.prunedPartitions.nonEmpty) { - val hivePartitions = - relation.prunedPartitions.get.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) if (partitionPruningPred.forall(!ExecSubqueryExpression.hasSubquery(_))) { - hivePartitions + basePrunedPartitions } else { - prunePartitions(hivePartitions) + prunePartitions(basePrunedPartitions) } + } else if ( + sparkSession.sessionState.conf.metastorePartitionPruning && + partitionPruningPred.nonEmpty + ) { + basePrunedPartitions } else { - if ( - sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.nonEmpty - ) { - rawPartitions - } else { - prunePartitions(rawPartitions) - } + prunePartitions(basePrunedPartitions) } - } // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = {