From 9c3aeef11895d803d0f7a56cd4450f6a7ff9e5a1 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Fri, 6 Mar 2026 12:50:17 +0530 Subject: [PATCH 1/6] Fix DPP regression for Hive scans and add DynamicPartitionPruningHiveScanSuite --- .../offload/OffloadSingleNodeRules.scala | 1 - .../hive/HiveTableScanExecTransformer.scala | 17 +++++++- .../utils/velox/VeloxTestSettings.scala | 7 ++++ ...DynamicPartitionPruningHiveScanSuite.scala | 40 +++++++++++++++++++ .../utils/velox/VeloxTestSettings.scala | 7 ++++ ...DynamicPartitionPruningHiveScanSuite.scala | 40 +++++++++++++++++++ 6 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala create mode 100644 gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 684fbd36f1ac..f0b6491e7d04 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -202,7 +202,6 @@ object OffloadOthers { case plan: FileSourceScanExec => ScanTransformerFactory.createFileSourceScanTransformer(plan) case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => - // TODO: Add DynamicPartitionPruningHiveScanSuite.scala HiveTableScanExecTransformer(plan) case plan: CoalesceExec => ColumnarCoalesceExec(plan.numPartitions, plan.child) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index b971eb46bc2c..75f2944ec290 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -76,8 +76,21 @@ case class HiveTableScanExecTransformer( override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = partitionWithReadFileFormats - override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = - distinctReadFileFormats + override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = { + if (!relation.isPartitioned) { + Set(fileFormat) + } else { + // Use statically pruned partitions from the relation instead of prunedPartitions + // to avoid triggering DPP (Dynamic Partition Pruning) subquery evaluation during + // validation, when those subqueries have not yet been executed. + relation.prunedPartitions match { + case Some(partitions) if partitions.nonEmpty => + partitions.map(p => getReadFileFormat(p.storage)).toSet + case _ => + Set(fileFormat) + } + } + } override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 399661654ff6..b0241b313c06 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} +import org.apache.spark.sql.hive.{GlutenDynamicPartitionPruningHiveScanSuiteAEOff, GlutenDynamicPartitionPruningHiveScanSuiteAEOn} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming._ @@ -925,6 +926,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan] + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOff] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOn] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") enableSuite[GlutenExpressionsSchemaSuite] enableSuite[GlutenExtraStrategiesSuite] enableSuite[GlutenFileBasedDataSourceSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala new file mode 100644 index 000000000000..07566cea7e3d --- /dev/null +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.SparkConf +import org.apache.spark.sql.GlutenDynamicPartitionPruningSuiteBase +import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} + +abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase + extends GlutenDynamicPartitionPruningSuiteBase { + + override val tableFormat: String = "hive" + + override def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.catalogImplementation", "hive") + } +} + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOff + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with DisableAdaptiveExecutionSuite + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOn + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with EnableAdaptiveExecutionSuite diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 10802c889295..ec6b4506cfee 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} +import org.apache.spark.sql.hive.{GlutenDynamicPartitionPruningHiveScanSuiteAEOff, GlutenDynamicPartitionPruningHiveScanSuiteAEOn} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming._ @@ -898,6 +899,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan] + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOff] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOn] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") enableSuite[GlutenExpressionsSchemaSuite] enableSuite[GlutenExtraStrategiesSuite] enableSuite[GlutenFileBasedDataSourceSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala new file mode 100644 index 000000000000..07566cea7e3d --- /dev/null +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.SparkConf +import org.apache.spark.sql.GlutenDynamicPartitionPruningSuiteBase +import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} + +abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase + extends GlutenDynamicPartitionPruningSuiteBase { + + override val tableFormat: String = "hive" + + override def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.catalogImplementation", "hive") + } +} + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOff + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with DisableAdaptiveExecutionSuite + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOn + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with EnableAdaptiveExecutionSuite From 758dfbc2cb9c5234324fddec05e313aed793d040 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Fri, 6 Mar 2026 20:44:23 +0530 Subject: [PATCH 2/6] Fix test --- ...DynamicPartitionPruningHiveScanSuite.scala | 97 +++++++++++++++++-- ...DynamicPartitionPruningHiveScanSuite.scala | 97 +++++++++++++++++-- 2 files changed, 178 insertions(+), 16 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 07566cea7e3d..047118cca9df 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -16,18 +16,99 @@ */ package org.apache.spark.sql.hive -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenDynamicPartitionPruningSuiteBase -import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.gluten.execution.FileSourceScanExecTransformer + +import org.apache.spark.sql.{DataFrame, GlutenTestsBaseTrait} +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} +import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} +import org.apache.spark.sql.hive.execution.HiveTableScanExec abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase - extends GlutenDynamicPartitionPruningSuiteBase { + extends DynamicPartitionPruningHiveScanSuiteBase + with GlutenTestsBaseTrait { + + override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { + flatMap(plan) { + case f: FileSourceScanExecTransformer => + f.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case h: HiveTableScanExec => + h.partitionPruningPred.collect { case d: DynamicPruningExpression => d.child } + case _ => Nil + } + } + + override def checkPartitionPruningPredicate( + df: DataFrame, + withSubquery: Boolean, + withBroadcast: Boolean): Unit = { + df.collect() + + val plan = df.queryExecution.executedPlan + val dpExprs = collectDynamicPruningExpressions(plan) + val hasSubquery = dpExprs.exists { + case InSubqueryExec(_, _: SubqueryExec, _, _, _, _) => true + case _ => false + } + val subqueryBroadcast = dpExprs.collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => b + } + + val hasFilter = if (withSubquery) "Should" else "Shouldn't" + assert( + hasSubquery == withSubquery, + s"$hasFilter trigger DPP with a subquery duplicate:\n${df.queryExecution}") + val hasBroadcast = if (withBroadcast) "Should" else "Shouldn't" + assert( + subqueryBroadcast.nonEmpty == withBroadcast, + s"$hasBroadcast trigger DPP with a reused broadcast exchange:\n${df.queryExecution}") + + subqueryBroadcast.foreach { + s => + s.child match { + case _: ReusedExchangeExec => // reuse check ok. + case a: AdaptiveSparkPlanExec => + val broadcastQueryStage = collectFirst(a) { case b: BroadcastQueryStageExec => b } + val broadcastPlan = broadcastQueryStage.get.broadcast + val hasReuse = find(plan) { + case ReusedExchangeExec(_, e) => e eq broadcastPlan + case b: BroadcastExchangeLike => b eq broadcastPlan + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok. + case b: BroadcastExchangeLike => + val hasReuse = plan.find { + case ReusedExchangeExec(_, e) => e eq b + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case _ => + fail(s"Invalid child node found in\n$s") + } + } + + val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec] + subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach { + s => + val subquery = s match { + case r: ReusedSubqueryExec => r.child + case o => o + } + assert( + subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == isMainQueryAdaptive) + } + } - override val tableFormat: String = "hive" + override def checkDistinctSubqueries(df: DataFrame, n: Int): Unit = { + df.collect() - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.catalogImplementation", "hive") + val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => + b + } + assert(buf.distinct.size == n) } } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 07566cea7e3d..047118cca9df 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -16,18 +16,99 @@ */ package org.apache.spark.sql.hive -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenDynamicPartitionPruningSuiteBase -import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.gluten.execution.FileSourceScanExecTransformer + +import org.apache.spark.sql.{DataFrame, GlutenTestsBaseTrait} +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} +import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} +import org.apache.spark.sql.hive.execution.HiveTableScanExec abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase - extends GlutenDynamicPartitionPruningSuiteBase { + extends DynamicPartitionPruningHiveScanSuiteBase + with GlutenTestsBaseTrait { + + override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { + flatMap(plan) { + case f: FileSourceScanExecTransformer => + f.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case h: HiveTableScanExec => + h.partitionPruningPred.collect { case d: DynamicPruningExpression => d.child } + case _ => Nil + } + } + + override def checkPartitionPruningPredicate( + df: DataFrame, + withSubquery: Boolean, + withBroadcast: Boolean): Unit = { + df.collect() + + val plan = df.queryExecution.executedPlan + val dpExprs = collectDynamicPruningExpressions(plan) + val hasSubquery = dpExprs.exists { + case InSubqueryExec(_, _: SubqueryExec, _, _, _, _) => true + case _ => false + } + val subqueryBroadcast = dpExprs.collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => b + } + + val hasFilter = if (withSubquery) "Should" else "Shouldn't" + assert( + hasSubquery == withSubquery, + s"$hasFilter trigger DPP with a subquery duplicate:\n${df.queryExecution}") + val hasBroadcast = if (withBroadcast) "Should" else "Shouldn't" + assert( + subqueryBroadcast.nonEmpty == withBroadcast, + s"$hasBroadcast trigger DPP with a reused broadcast exchange:\n${df.queryExecution}") + + subqueryBroadcast.foreach { + s => + s.child match { + case _: ReusedExchangeExec => // reuse check ok. + case a: AdaptiveSparkPlanExec => + val broadcastQueryStage = collectFirst(a) { case b: BroadcastQueryStageExec => b } + val broadcastPlan = broadcastQueryStage.get.broadcast + val hasReuse = find(plan) { + case ReusedExchangeExec(_, e) => e eq broadcastPlan + case b: BroadcastExchangeLike => b eq broadcastPlan + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok. + case b: BroadcastExchangeLike => + val hasReuse = plan.find { + case ReusedExchangeExec(_, e) => e eq b + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case _ => + fail(s"Invalid child node found in\n$s") + } + } + + val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec] + subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach { + s => + val subquery = s match { + case r: ReusedSubqueryExec => r.child + case o => o + } + assert( + subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == isMainQueryAdaptive) + } + } - override val tableFormat: String = "hive" + override def checkDistinctSubqueries(df: DataFrame, n: Int): Unit = { + df.collect() - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.catalogImplementation", "hive") + val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => + b + } + assert(buf.distinct.size == n) } } From f6859aed6c77934ec71a0e98d8fa9535f04df2cc Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Mon, 9 Mar 2026 06:24:59 +0000 Subject: [PATCH 3/6] Test init --- ...DynamicPartitionPruningHiveScanSuite.scala | 36 ++++++++++++++++++- ...DynamicPartitionPruningHiveScanSuite.scala | 36 ++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 047118cca9df..461c014e49fb 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.hive import org.apache.gluten.execution.FileSourceScanExecTransformer -import org.apache.spark.sql.{DataFrame, GlutenTestsBaseTrait} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait} import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} +import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} @@ -29,6 +31,38 @@ abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase extends DynamicPartitionPruningHiveScanSuiteBase with GlutenTestsBaseTrait { + private var _spark: SparkSession = null + + override protected def spark: SparkSession = _spark + + override def sparkConf: SparkConf = { + GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse) + } + + override def beforeAll(): Unit = { + if (_spark == null) { + _spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() + } + super.beforeAll() + } + + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + try { + if (_spark != null) { + _spark.sessionState.catalog.reset() + _spark.stop() + _spark = null + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } + } + override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { flatMap(plan) { case f: FileSourceScanExecTransformer => diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 047118cca9df..461c014e49fb 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.hive import org.apache.gluten.execution.FileSourceScanExecTransformer -import org.apache.spark.sql.{DataFrame, GlutenTestsBaseTrait} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait} import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} +import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} @@ -29,6 +31,38 @@ abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase extends DynamicPartitionPruningHiveScanSuiteBase with GlutenTestsBaseTrait { + private var _spark: SparkSession = null + + override protected def spark: SparkSession = _spark + + override def sparkConf: SparkConf = { + GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse) + } + + override def beforeAll(): Unit = { + if (_spark == null) { + _spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() + } + super.beforeAll() + } + + override def afterAll(): Unit = { + try { + super.afterAll() + } finally { + try { + if (_spark != null) { + _spark.sessionState.catalog.reset() + _spark.stop() + _spark = null + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } + } + override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { flatMap(plan) { case f: FileSourceScanExecTransformer => From fb17879048d1bcbbf082368486460026c68eb63b Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Mon, 9 Mar 2026 09:53:42 +0000 Subject: [PATCH 4/6] Fix build --- .../hive/GlutenDynamicPartitionPruningHiveScanSuite.scala | 7 ++----- .../hive/GlutenDynamicPartitionPruningHiveScanSuite.scala | 7 ++----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 461c014e49fb..1ffd7d6f5ccd 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -35,13 +35,10 @@ abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase override protected def spark: SparkSession = _spark - override def sparkConf: SparkConf = { - GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse) - } - override def beforeAll(): Unit = { if (_spark == null) { - _spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() + val conf = GlutenSQLTestsBaseTrait.nativeSparkConf(new SparkConf(), warehouse) + _spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } super.beforeAll() } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 461c014e49fb..1ffd7d6f5ccd 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -35,13 +35,10 @@ abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase override protected def spark: SparkSession = _spark - override def sparkConf: SparkConf = { - GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse) - } - override def beforeAll(): Unit = { if (_spark == null) { - _spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() + val conf = GlutenSQLTestsBaseTrait.nativeSparkConf(new SparkConf(), warehouse) + _spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } super.beforeAll() } From 59e507d50e2dd677805585c83e27300416bd0b63 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Tue, 10 Mar 2026 06:16:40 +0000 Subject: [PATCH 5/6] Fix test --- ...DynamicPartitionPruningHiveScanSuite.scala | 36 ++++--------------- ...DynamicPartitionPruningHiveScanSuite.scala | 36 ++++--------------- 2 files changed, 12 insertions(+), 60 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 1ffd7d6f5ccd..3bf9e03d6aec 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -19,45 +19,21 @@ package org.apache.spark.sql.hive import org.apache.gluten.execution.FileSourceScanExecTransformer import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait} +import org.apache.spark.sql.{DynamicPartitionPruningSuiteBase, DataFrame, GlutenSQLTestsTrait} import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} -import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} import org.apache.spark.sql.hive.execution.HiveTableScanExec abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase - extends DynamicPartitionPruningHiveScanSuiteBase - with GlutenTestsBaseTrait { + extends DynamicPartitionPruningSuiteBase + with GlutenSQLTestsTrait { - private var _spark: SparkSession = null + override val tableFormat: String = "hive" - override protected def spark: SparkSession = _spark - - override def beforeAll(): Unit = { - if (_spark == null) { - val conf = GlutenSQLTestsBaseTrait.nativeSparkConf(new SparkConf(), warehouse) - _spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - } - super.beforeAll() - } - - override def afterAll(): Unit = { - try { - super.afterAll() - } finally { - try { - if (_spark != null) { - _spark.sessionState.catalog.reset() - _spark.stop() - _spark = null - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalogImplementation", "hive") } override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 1ffd7d6f5ccd..3bf9e03d6aec 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -19,45 +19,21 @@ package org.apache.spark.sql.hive import org.apache.gluten.execution.FileSourceScanExecTransformer import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait} +import org.apache.spark.sql.{DynamicPartitionPruningSuiteBase, DataFrame, GlutenSQLTestsTrait} import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} -import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} import org.apache.spark.sql.hive.execution.HiveTableScanExec abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase - extends DynamicPartitionPruningHiveScanSuiteBase - with GlutenTestsBaseTrait { + extends DynamicPartitionPruningSuiteBase + with GlutenSQLTestsTrait { - private var _spark: SparkSession = null + override val tableFormat: String = "hive" - override protected def spark: SparkSession = _spark - - override def beforeAll(): Unit = { - if (_spark == null) { - val conf = GlutenSQLTestsBaseTrait.nativeSparkConf(new SparkConf(), warehouse) - _spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - } - super.beforeAll() - } - - override def afterAll(): Unit = { - try { - super.afterAll() - } finally { - try { - if (_spark != null) { - _spark.sessionState.catalog.reset() - _spark.stop() - _spark = null - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalogImplementation", "hive") } override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { From 2994c257bbc6cff4833a20473f42b3675b5c92bd Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Tue, 10 Mar 2026 10:15:45 +0000 Subject: [PATCH 6/6] FIx format --- .../sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala index 3bf9e03d6aec..a611203241f4 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import org.apache.gluten.execution.FileSourceScanExecTransformer import org.apache.spark.SparkConf -import org.apache.spark.sql.{DynamicPartitionPruningSuiteBase, DataFrame, GlutenSQLTestsTrait} +import org.apache.spark.sql.{DataFrame, DynamicPartitionPruningSuiteBase, GlutenSQLTestsTrait} import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}