Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class DeletionVectorWriteTransformer(

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ case class DeltaOptimizedWriterTransformer(

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override protected def doExecute(): RDD[InternalRow] = {
child.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class CHColumnarToCarrierRowExec(override val child: SparkPlan)
extends ColumnarToCarrierRowExecBase {
override protected def fromBatchType(): Convention.BatchType = CHBatchType
override def rowType0(): Convention.RowType = CHCarrierRowType
override def rowType(): Convention.RowType = CHCarrierRowType
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
// Since https://github.com/apache/incubator-gluten/pull/1595.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ case class GlutenDeltaOptimizedWriterExec(

override def batchType(): Convention.BatchType = VeloxBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ object GlutenDeltaJobStatsTracker extends Logging {
with LeafExecNode {
override def output: Seq[Attribute] = keySchema ++ dataSchema
override def batchType(): Convention.BatchType = VeloxBatchType
override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
throw new UnsupportedOperationException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ case class ColumnarPartialGenerateExec(generateExec: GenerateExec, child: SparkP

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

final override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ case class ColumnarPartialProjectExec(projectList: Seq[Expression], child: Spark

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

final override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ case class GpuResizeBufferColumnarBatchExec(override val child: SparkPlan, minOu

override def batchType(): Convention.BatchType = VeloxBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.SparkPlan
case class VeloxColumnarToCarrierRowExec(override val child: SparkPlan)
extends ColumnarToCarrierRowExecBase {
override protected def fromBatchType(): Convention.BatchType = VeloxBatchType
override def rowType0(): Convention.RowType = VeloxCarrierRowType
override def rowType(): Convention.RowType = VeloxCarrierRowType
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ case class VeloxResizeBatchesExec(

override def batchType(): Convention.BatchType = VeloxBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ case class ColumnarArrowEvalPythonExec(

override def batchType(): Convention.BatchType = ArrowJavaBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override protected def doValidateInternal(): ValidationResult = {
val (_, inputs) = udfs.map(ColumnarArrowEvalPythonExec.collectFunctions).unzip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ trait BaseArrowScanExec extends GlutenPlan {
ArrowBatchTypes.ArrowJavaBatchType
}

final override def rowType0(): Convention.RowType = Convention.RowType.None
final override def rowType(): Convention.RowType = Convention.RowType.None
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package org.apache.gluten.execution

import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig, VeloxConfig}
import org.apache.gluten.expression.VeloxDummyExpression
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.joins.BaseJoinExec
Expand Down Expand Up @@ -911,18 +910,9 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa

test("Verify parquet field name with special character") {
withTable("t") {

// https://github.com/apache/spark/pull/35229 Spark remove parquet field name check after 3.2
if (!SparkShimLoader.getSparkVersion.startsWith("3.2")) {
sql("create table t using parquet as select sum(l_partkey) from lineitem")
runQueryAndCompare("select * from t") {
checkGlutenPlan[FileSourceScanExecTransformer]
}
} else {
val msg = intercept[AnalysisException] {
sql("create table t using parquet as select sum(l_partkey) from lineitem")
}.message
assert(msg.contains("contains invalid character"))
sql("create table t using parquet as select sum(l_partkey) from lineitem")
runQueryAndCompare("select * from t") {
checkGlutenPlan[FileSourceScanExecTransformer]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait GlutenColumnarToColumnarTransition extends ColumnarToColumnarTransition wi

override def batchType(): Convention.BatchType = to

override def rowType0(): Convention.RowType = {
override def rowType(): Convention.RowType = {
Convention.RowType.None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.SparkPlan
*
* Instead, subclasses are expected to implement the following APIs:
* - batchType
* - rowType0
* - rowType
* - requiredChildConvention (optional)
*
* With implementations of the APIs provided, Gluten query planner will be able to find and insert
Expand All @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.SparkPlan
trait GlutenPlan
extends SparkPlan
with Convention.KnownBatchType
with Convention.KnownRowTypeForSpark33OrLater
with Convention.KnownRowType
with GlutenPlan.SupportsRowBasedCompatible
with ConventionReq.KnownChildConvention {

Expand All @@ -59,7 +59,7 @@ trait GlutenPlan

override def batchType(): Convention.BatchType

override def rowType0(): Convention.RowType
override def rowType(): Convention.RowType

override def requiredChildConvention(): Seq[ConventionReq] = {
// In the normal case, children's convention should follow parent node's convention.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean
case class GroupLeafExec(groupId: Int, metadata: GlutenMetadata, convReq: Conv.Req)
extends LeafExecNode
with Convention.KnownBatchType
with Convention.KnownRowTypeForSpark33OrLater
with Convention.KnownRowType
with GlutenPlan.SupportsRowBasedCompatible {

private val frozen = new AtomicBoolean(false)
Expand All @@ -62,7 +62,7 @@ case class GroupLeafExec(groupId: Int, metadata: GlutenMetadata, convReq: Conv.R
batchType != Convention.BatchType.None
}

override val rowType0: Convention.RowType = {
override val rowType: Convention.RowType = {
val out = convReq.req.requiredRowType match {
case ConventionReq.RowType.Any => Convention.RowType.VanillaRowType
case ConventionReq.RowType.Is(r) => r
Expand All @@ -71,7 +71,7 @@ case class GroupLeafExec(groupId: Int, metadata: GlutenMetadata, convReq: Conv.R
}

final override val supportsRowBased: Boolean = {
rowType() != Convention.RowType.None
rowType != Convention.RowType.None
}

private def ensureNotFrozen(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object OffloadSingleNode {
private lazy val conv: Convention = Convention.get(hiddenPlan)

override def batchType(): Convention.BatchType = conv.batchType
override def rowType0(): Convention.RowType = conv.rowType
override def rowType(): Convention.RowType = conv.rowType
override def output: Seq[Attribute] = hiddenPlan.output
override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning
override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.gluten.extension.columnar.transition

import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.util.SparkVersionUtil

import scala.collection.mutable

Expand Down Expand Up @@ -163,30 +162,7 @@ object Convention {
def batchType(): BatchType
}

sealed trait KnownRowType {
trait KnownRowType {
def rowType(): RowType
}

trait KnownRowTypeForSpark33OrLater extends KnownRowType {
this: SparkPlan =>

final override def rowType(): RowType = {
if (SparkVersionUtil.lteSpark32) {
// It's known that in Spark 3.2, one Spark plan node is considered either only having
// row-based support or only having columnar support at a time.
// Hence, if the plan supports columnar output, we'd disable its row-based support.
// The same for the opposite.
if (supportsColumnar) {
Convention.RowType.None
} else {
assert(rowType0() != Convention.RowType.None)
rowType0()
}
} else {
rowType0()
}
}

def rowType0(): RowType
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.config.GlutenCoreConfig

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.util.{SparkVersionUtil, Utils}
import org.apache.spark.util.Utils

/**
* This [[CostEvaluator]] is to force use the new physical plan when cost is equal.
Expand All @@ -30,16 +30,11 @@ import org.apache.spark.util.{SparkVersionUtil, Utils}
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {

private val vanillaCostEvaluator: CostEvaluator = {
if (SparkVersionUtil.lteSpark32) {
val clazz = Utils.classForName("org.apache.spark.sql.execution.adaptive.SimpleCostEvaluator$")
clazz.getDeclaredField("MODULE$").get(null).asInstanceOf[CostEvaluator]
} else {
val forceOptimizeSkewedJoin =
conf.getConfString("spark.sql.adaptive.forceOptimizeSkewedJoin").toBoolean
val clazz = Utils.classForName("org.apache.spark.sql.execution.adaptive.SimpleCostEvaluator")
val ctor = clazz.getConstructor(classOf[Boolean])
ctor.newInstance(forceOptimizeSkewedJoin.asInstanceOf[Object]).asInstanceOf[CostEvaluator]
}
val forceOptimizeSkewedJoin =
conf.getConfString("spark.sql.adaptive.forceOptimizeSkewedJoin").toBoolean
val clazz = Utils.classForName("org.apache.spark.sql.execution.adaptive.SimpleCostEvaluator")
val ctor = clazz.getConstructor(classOf[Boolean])
ctor.newInstance(forceOptimizeSkewedJoin.asInstanceOf[Object]).asInstanceOf[CostEvaluator]
}

override def evaluateCost(plan: SparkPlan): Cost = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ import org.apache.spark.sql.internal.SQLConf
object SparkPlanUtil {

def supportsRowBased(plan: SparkPlan): Boolean = {
if (SparkVersionUtil.lteSpark32) {
return !plan.supportsColumnar
}

val m = classOf[SparkPlan].getMethod("supportsRowBased")
m.invoke(plan).asInstanceOf[Boolean]
}

def isPlannedV1Write(plan: DataWritingCommandExec): Boolean = {
if (SparkVersionUtil.lteSpark33) {
if (SparkVersionUtil.eqSpark33) {
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,6 @@ object SparkTaskUtil {
ctors.head
}

if (SparkVersionUtil.lteSpark32) {
return ctor
.newInstance(
stageId,
stageAttemptNumber,
partitionId,
taskAttemptId,
attemptNumber,
taskMemoryManager,
localProperties,
metricsSystem,
taskMetrics,
resources
)
.asInstanceOf[TaskContext]
}

if (SparkVersionUtil.eqSpark33) {
return ctor
.newInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package org.apache.spark.util

object SparkVersionUtil {
val lteSpark32: Boolean = compareMajorMinorVersion((3, 2)) <= 0
private val comparedWithSpark33 = compareMajorMinorVersion((3, 3))
private val comparedWithSpark35 = compareMajorMinorVersion((3, 5))
val eqSpark33: Boolean = comparedWithSpark33 == 0
val lteSpark33: Boolean = lteSpark32 || eqSpark33
val gteSpark33: Boolean = comparedWithSpark33 >= 0
val gteSpark35: Boolean = comparedWithSpark35 >= 0
val gteSpark40: Boolean = compareMajorMinorVersion((4, 0)) >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ case class ColumnarUnionExec(children: Seq[SparkPlan], partitioning: Partitionin

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override def outputPartitioning: Partitioning = partitioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import java.io.{IOException, ObjectOutputStream}
case class ColumnarCartesianProductBridge(child: SparkPlan) extends UnaryExecNode with GlutenPlan {
override def output: Seq[Attribute] = child.output
override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType
override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] =
throw new UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = child.executeColumnar()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class ColumnarCoalesceExec(numPartitions: Int, child: SparkPlan)

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class ColumnarCollectLimitBaseExec(
) extends LimitExec
with ValidatablePlan {

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ abstract class ColumnarCollectTailBaseExec(
.genColumnarShuffleExchangeMetrics(sparkContext, shuffleWriterType) ++
readMetrics ++ writeMetrics

override def rowType0(): Convention.RowType = Convention.RowType.None
override def rowType(): Convention.RowType = Convention.RowType.None

override def output: Seq[Attribute] = child.output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)

override def batchType(): Convention.BatchType = Convention.BatchType.None

override def rowType0(): Convention.RowType = Convention.RowType.VanillaRowType
override def rowType(): Convention.RowType = Convention.RowType.VanillaRowType

override def requiredChildConvention(): Seq[ConventionReq] = {
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ trait ColumnarV2TableWriteExec extends V2TableWriteExec with ValidatablePlan {

override def batchType(): Convention.BatchType = Convention.BatchType.None

override def rowType0(): Convention.RowType = RowType.VanillaRowType
override def rowType(): Convention.RowType = RowType.VanillaRowType

override def requiredChildConvention(): Seq[ConventionReq] = Seq(
ConventionReq.ofBatch(
Expand Down
Loading
Loading