diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 684fbd36f1ac..f0b6491e7d04 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -202,7 +202,6 @@ object OffloadOthers { case plan: FileSourceScanExec => ScanTransformerFactory.createFileSourceScanTransformer(plan) case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => - // TODO: Add DynamicPartitionPruningHiveScanSuite.scala HiveTableScanExecTransformer(plan) case plan: CoalesceExec => ColumnarCoalesceExec(plan.numPartitions, plan.child) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index b971eb46bc2c..75f2944ec290 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -76,8 +76,21 @@ case class HiveTableScanExecTransformer( override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = partitionWithReadFileFormats - override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = - distinctReadFileFormats + override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = { + if (!relation.isPartitioned) { + Set(fileFormat) + } else { + // Use statically pruned partitions from the relation instead of prunedPartitions + // to avoid triggering DPP (Dynamic Partition Pruning) subquery evaluation during + // validation, when those subqueries have not yet been executed. + relation.prunedPartitions match { + case Some(partitions) if partitions.nonEmpty => + partitions.map(p => getReadFileFormat(p.storage)).toSet + case _ => + Set(fileFormat) + } + } + } override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 399661654ff6..b0241b313c06 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} +import org.apache.spark.sql.hive.{GlutenDynamicPartitionPruningHiveScanSuiteAEOff, GlutenDynamicPartitionPruningHiveScanSuiteAEOn} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming._ @@ -925,6 +926,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan] + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOff] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOn] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") enableSuite[GlutenExpressionsSchemaSuite] enableSuite[GlutenExtraStrategiesSuite] enableSuite[GlutenFileBasedDataSourceSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala new file mode 100644 index 000000000000..a611203241f4 --- /dev/null +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.gluten.execution.FileSourceScanExecTransformer + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, DynamicPartitionPruningSuiteBase, GlutenSQLTestsTrait} +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} +import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase + extends DynamicPartitionPruningSuiteBase + with GlutenSQLTestsTrait { + + override val tableFormat: String = "hive" + + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalogImplementation", "hive") + } + + override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { + flatMap(plan) { + case f: FileSourceScanExecTransformer => + f.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case h: HiveTableScanExec => + h.partitionPruningPred.collect { case d: DynamicPruningExpression => d.child } + case _ => Nil + } + } + + override def checkPartitionPruningPredicate( + df: DataFrame, + withSubquery: Boolean, + withBroadcast: Boolean): Unit = { + df.collect() + + val plan = df.queryExecution.executedPlan + val dpExprs = collectDynamicPruningExpressions(plan) + val hasSubquery = dpExprs.exists { + case InSubqueryExec(_, _: SubqueryExec, _, _, _, _) => true + case _ => false + } + val subqueryBroadcast = dpExprs.collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => b + } + + val hasFilter = if (withSubquery) "Should" else "Shouldn't" + assert( + hasSubquery == withSubquery, + s"$hasFilter trigger DPP with a subquery duplicate:\n${df.queryExecution}") + val hasBroadcast = if (withBroadcast) "Should" else "Shouldn't" + assert( + subqueryBroadcast.nonEmpty == withBroadcast, + s"$hasBroadcast trigger DPP with a reused broadcast exchange:\n${df.queryExecution}") + + subqueryBroadcast.foreach { + s => + s.child match { + case _: ReusedExchangeExec => // reuse check ok. + case a: AdaptiveSparkPlanExec => + val broadcastQueryStage = collectFirst(a) { case b: BroadcastQueryStageExec => b } + val broadcastPlan = broadcastQueryStage.get.broadcast + val hasReuse = find(plan) { + case ReusedExchangeExec(_, e) => e eq broadcastPlan + case b: BroadcastExchangeLike => b eq broadcastPlan + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok. + case b: BroadcastExchangeLike => + val hasReuse = plan.find { + case ReusedExchangeExec(_, e) => e eq b + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case _ => + fail(s"Invalid child node found in\n$s") + } + } + + val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec] + subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach { + s => + val subquery = s match { + case r: ReusedSubqueryExec => r.child + case o => o + } + assert( + subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == isMainQueryAdaptive) + } + } + + override def checkDistinctSubqueries(df: DataFrame, n: Int): Unit = { + df.collect() + + val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => + b + } + assert(buf.distinct.size == n) + } +} + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOff + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with DisableAdaptiveExecutionSuite + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOn + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with EnableAdaptiveExecutionSuite diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 10802c889295..ec6b4506cfee 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} +import org.apache.spark.sql.hive.{GlutenDynamicPartitionPruningHiveScanSuiteAEOff, GlutenDynamicPartitionPruningHiveScanSuiteAEOn} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming._ @@ -898,6 +899,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan] enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan] + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOff] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") + enableSuite[GlutenDynamicPartitionPruningHiveScanSuiteAEOn] + .exclude("SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec") + .exclude("Make sure dynamic pruning works on uncorrelated queries") enableSuite[GlutenExpressionsSchemaSuite] enableSuite[GlutenExtraStrategiesSuite] enableSuite[GlutenFileBasedDataSourceSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala new file mode 100644 index 000000000000..3bf9e03d6aec --- /dev/null +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/hive/GlutenDynamicPartitionPruningHiveScanSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.gluten.execution.FileSourceScanExecTransformer + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DynamicPartitionPruningSuiteBase, DataFrame, GlutenSQLTestsTrait} +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} +import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, InSubqueryExec, ReusedSubqueryExec, SparkPlan, SubqueryExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +abstract class GlutenDynamicPartitionPruningHiveScanSuiteBase + extends DynamicPartitionPruningSuiteBase + with GlutenSQLTestsTrait { + + override val tableFormat: String = "hive" + + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalogImplementation", "hive") + } + + override protected def collectDynamicPruningExpressions(plan: SparkPlan): Seq[Expression] = { + flatMap(plan) { + case f: FileSourceScanExecTransformer => + f.partitionFilters.collect { case d: DynamicPruningExpression => d.child } + case h: HiveTableScanExec => + h.partitionPruningPred.collect { case d: DynamicPruningExpression => d.child } + case _ => Nil + } + } + + override def checkPartitionPruningPredicate( + df: DataFrame, + withSubquery: Boolean, + withBroadcast: Boolean): Unit = { + df.collect() + + val plan = df.queryExecution.executedPlan + val dpExprs = collectDynamicPruningExpressions(plan) + val hasSubquery = dpExprs.exists { + case InSubqueryExec(_, _: SubqueryExec, _, _, _, _) => true + case _ => false + } + val subqueryBroadcast = dpExprs.collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => b + } + + val hasFilter = if (withSubquery) "Should" else "Shouldn't" + assert( + hasSubquery == withSubquery, + s"$hasFilter trigger DPP with a subquery duplicate:\n${df.queryExecution}") + val hasBroadcast = if (withBroadcast) "Should" else "Shouldn't" + assert( + subqueryBroadcast.nonEmpty == withBroadcast, + s"$hasBroadcast trigger DPP with a reused broadcast exchange:\n${df.queryExecution}") + + subqueryBroadcast.foreach { + s => + s.child match { + case _: ReusedExchangeExec => // reuse check ok. + case a: AdaptiveSparkPlanExec => + val broadcastQueryStage = collectFirst(a) { case b: BroadcastQueryStageExec => b } + val broadcastPlan = broadcastQueryStage.get.broadcast + val hasReuse = find(plan) { + case ReusedExchangeExec(_, e) => e eq broadcastPlan + case b: BroadcastExchangeLike => b eq broadcastPlan + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok. + case b: BroadcastExchangeLike => + val hasReuse = plan.find { + case ReusedExchangeExec(_, e) => e eq b + case _ => false + }.isDefined + assert(hasReuse, s"$s\nshould have been reused in\n$plan") + case _ => + fail(s"Invalid child node found in\n$s") + } + } + + val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec] + subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach { + s => + val subquery = s match { + case r: ReusedSubqueryExec => r.child + case o => o + } + assert( + subquery.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == isMainQueryAdaptive) + } + } + + override def checkDistinctSubqueries(df: DataFrame, n: Int): Unit = { + df.collect() + + val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect { + case InSubqueryExec(_, b: ColumnarSubqueryBroadcastExec, _, _, _, _) => + b + } + assert(buf.distinct.size == n) + } +} + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOff + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with DisableAdaptiveExecutionSuite + +class GlutenDynamicPartitionPruningHiveScanSuiteAEOn + extends GlutenDynamicPartitionPruningHiveScanSuiteBase + with EnableAdaptiveExecutionSuite