diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 834127e20cc1..f754f4ae0bbf 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1495,6 +1495,31 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // The columns present in the table, if not available default to the baseSchema. auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema; + // Build dataColumns from tableSchema, excluding partition columns. + // HiveTableHandle::dataColumns() is used as fileSchema for the reader. + // Partition columns should not be validated against the file's physical types + // (their values come from the partition path, not from the file). + std::unordered_set partitionColNames; + for (int idx = 0; idx < colNameList.size(); idx++) { + if (columnTypes[idx] == ColumnType::kPartitionKey) { + partitionColNames.insert(colNameList[idx]); + } + } + RowTypePtr dataColumns; + if (partitionColNames.empty()) { + dataColumns = tableSchema; + } else { + std::vector dataColNames; + std::vector dataColTypes; + for (int idx = 0; idx < tableSchema->size(); idx++) { + if (partitionColNames.find(tableSchema->nameOf(idx)) == partitionColNames.end()) { + dataColNames.push_back(tableSchema->nameOf(idx)); + dataColTypes.push_back(tableSchema->childAt(idx)); + } + } + dataColumns = ROW(std::move(dataColNames), std::move(dataColTypes)); + } + connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; auto connectorId = kHiveConnectorId; @@ -1506,7 +1531,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } common::SubfieldFilters subfieldFilters; tableHandle = std::make_shared( - connectorId, "hive_table", std::move(subfieldFilters), remainingFilter, tableSchema); + connectorId, "hive_table", std::move(subfieldFilters), remainingFilter, dataColumns); // Get assignments and out names. std::vector outNames; diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index c3e3cf0f427a..32ad05d1e7fb 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,8 +17,8 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_03_10-iceberg +VELOX_REPO=https://github.com/baibaichen/velox.git +VELOX_BRANCH=pr3/parquet-type-widening VELOX_ENHANCED_BRANCH=ibm-2026_03_10 VELOX_HOME="" RUN_SETUP_SCRIPT=ON diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 4f7c67daaad6..7ddc78efb537 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -323,10 +323,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetCommitterSuite] enableSuite[GlutenParquetFieldIdSchemaSuite] enableSuite[GlutenParquetTypeWideningSuite] + // Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals. .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)") .exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)") .exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)") .exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)") + // Velox native reader aligns with vectorized reader behavior, always rejecting incompatible decimal conversions. .exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)") .exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)") .exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)") @@ -338,22 +340,12 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)") .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)") .exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(12, 4)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(20, 12)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(10, 7)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(20, 17)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(7, 4)") .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)") .exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(12, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(10, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(7, 2)") .exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") .exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)") .exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)") @@ -363,29 +355,14 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)") - .exclude("unsupported parquet conversion LongType -> DateType") .exclude("unsupported parquet conversion LongType -> DecimalType(10,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(19,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(20,1)") - .exclude("unsupported parquet conversion LongType -> IntegerType") .exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)") .exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(11,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(11,1)") - .exclude("parquet widening conversion IntegerType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(38,0)") - .exclude("parquet widening conversion IntegerType -> DoubleType") - .exclude("parquet widening conversion LongType -> DecimalType(20,0)") - .exclude("parquet widening conversion LongType -> DecimalType(21,1)") - .exclude("parquet widening conversion LongType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(11,1)") - .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DoubleType") enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 2090b70f7727..b785dce559e3 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -16,6 +16,227 @@ */ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.gluten.config.GlutenConfig -class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {} +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait} +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal} + +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.{Encoding, ParquetProperties} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + +import java.io.File + +class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + + import testImplicits._ + + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. + // This suite tests the READ path. The native writer doesn't produce + // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's + // V2 encoding assertions expect. + override def sparkConf: SparkConf = + super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") + + // ====== Private methods copied from ParquetTypeWideningSuite ====== + // These are private in the parent class, so we must copy them to use in overridden tests. + // The key change: removed withAllParquetReaders wrapper since Velox native reader + // always behaves like the vectorized reader. + + private def checkAllParquetReaders( + values: Seq[String], + fromType: DataType, + toType: DataType, + expectError: Boolean): Unit = { + val timestampRebaseModes = toType match { + case _: TimestampNTZType | _: DateType => + Seq(LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.LEGACY) + case _ => + Seq(LegacyBehaviorPolicy.CORRECTED) + } + for { + dictionaryEnabled <- Seq(true, false) + timestampRebaseMode <- timestampRebaseModes + } + withClue( + s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + + s"'$timestampRebaseMode''") { + withAllParquetWriters { + withTempDir { + dir => + val expected = + writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode) + if (expectError) { + val exception = intercept[SparkException] { + readParquetFiles(dir, toType).collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause + .isInstanceOf[org.apache.parquet.io.ParquetDecodingException] || + exception.getCause.getMessage.contains("PARQUET_CONVERSION_FAILURE")) + } else { + checkAnswer(readParquetFiles(dir, toType), expected.select($"a".cast(toType))) + } + } + } + } + } + + private def readParquetFiles(dir: File, dataType: DataType): DataFrame = { + spark.read.schema(s"a ${dataType.sql}").parquet(dir.getAbsolutePath) + } + + private def writeParquetFiles( + dir: File, + values: Seq[String], + dataType: DataType, + dictionaryEnabled: Boolean, + timestampRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED) + : DataFrame = { + val repeatedValues = List.fill(if (dictionaryEnabled) 10 else 1)(values).flatten + val df = repeatedValues.toDF("a").select(col("a").cast(dataType)) + withSQLConf( + ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> timestampRebaseMode.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + + if (dictionaryEnabled && !DecimalType.isByteArrayDecimalType(dataType)) { + assertAllParquetFilesDictionaryEncoded(dir) + } + + val isParquetV2 = spark.conf + .getOption(ParquetOutputFormat.WRITER_VERSION) + .contains(ParquetProperties.WriterVersion.PARQUET_2_0.toString) + if (isParquetV2) { + if (dictionaryEnabled) { + assertParquetV2Encoding(dir, Encoding.PLAIN) + } else if (DecimalType.is64BitDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BINARY_PACKED) + } else if (DecimalType.isByteArrayDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BYTE_ARRAY) + } + } + df + } + + private def assertAllParquetFilesDictionaryEncoded(dir: File): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.hasDictionaryPage, + "This test covers dictionary encoding but column " + + s"'${col.getPath.toDotString}' in the test data is not dictionary encoded.") + } + } + } + } + + private def assertParquetV2Encoding(dir: File, expected_encoding: Encoding): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.getEncodings.contains(expected_encoding), + s"Expected column '${col.getPath.toDotString}' " + + s"to use encoding $expected_encoding " + + s"but found ${col.getEncodings}." + ) + } + } + } + } + + // ====== Override tests ====== + // Velox native reader always behaves like Spark's vectorized reader (no parquet-mr fallback). + // In the parent tests, `expectError` is conditional on PARQUET_VECTORIZED_READER_ENABLED: + // parquet-mr allows conversions that the vectorized reader rejects. + // Since Velox always rejects, we override with expectError = true. + + for { + (values: Seq[String], fromType: DataType, toType: DecimalType) <- Seq( + (Seq("1", "2"), ByteType, DecimalType(1, 0)), + (Seq("1", "2"), ByteType, ByteDecimal), + (Seq("1", "2"), ShortType, ByteDecimal), + (Seq("1", "2"), ShortType, ShortDecimal), + (Seq("1", "2"), IntegerType, ShortDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)), + (Seq("1", "2"), LongType, IntDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } + testGluten(s"unsupported parquet conversion $fromType -> $toType") { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + + for { + (fromPrecision, toPrecision) <- + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + testGluten( + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), + toType = DecimalType(toPrecision, 2), + expectError = true) + } + + for { + ((fromPrecision, fromScale), (toPrecision, toScale)) <- + // Narrowing precision and scale by the same amount. + Seq( + (7, 4) -> (5, 2), + (10, 7) -> (5, 2), + (20, 17) -> (5, 2), + (12, 4) -> (10, 2), + (20, 17) -> (10, 2), + (22, 4) -> (20, 2)) ++ + // Increasing precision and decreasing scale. + Seq((10, 6) -> (12, 4)) ++ + // Decreasing precision and increasing scale. + Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++ + // Increasing precision by a smaller amount than scale. + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7)) + } + testGluten( + s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + + s"Decimal($toPrecision, $toScale)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), + toType = DecimalType(toPrecision, toScale), + expectError = true) + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 0dadfa1d0bd8..c13884555400 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -334,10 +334,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetCommitterSuite] enableSuite[GlutenParquetFieldIdSchemaSuite] enableSuite[GlutenParquetTypeWideningSuite] + // Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals. .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)") .exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)") .exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)") .exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)") + // Velox native reader aligns with vectorized reader behavior, always rejecting incompatible decimal conversions. .exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)") .exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)") .exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)") @@ -349,22 +351,12 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)") .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)") .exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(12, 4)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(20, 12)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(10, 7)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(20, 17)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(7, 4)") .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)") .exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(12, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(10, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(7, 2)") .exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") .exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)") .exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)") @@ -374,29 +366,14 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)") - .exclude("unsupported parquet conversion LongType -> DateType") .exclude("unsupported parquet conversion LongType -> DecimalType(10,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(19,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(20,1)") - .exclude("unsupported parquet conversion LongType -> IntegerType") .exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)") .exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(11,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(11,1)") - .exclude("parquet widening conversion IntegerType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(38,0)") - .exclude("parquet widening conversion IntegerType -> DoubleType") - .exclude("parquet widening conversion LongType -> DecimalType(20,0)") - .exclude("parquet widening conversion LongType -> DecimalType(21,1)") - .exclude("parquet widening conversion LongType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(11,1)") - .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DoubleType") // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 2090b70f7727..b785dce559e3 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -16,6 +16,227 @@ */ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.gluten.config.GlutenConfig -class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {} +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait} +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal} + +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.{Encoding, ParquetProperties} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + +import java.io.File + +class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + + import testImplicits._ + + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. + // This suite tests the READ path. The native writer doesn't produce + // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's + // V2 encoding assertions expect. + override def sparkConf: SparkConf = + super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") + + // ====== Private methods copied from ParquetTypeWideningSuite ====== + // These are private in the parent class, so we must copy them to use in overridden tests. + // The key change: removed withAllParquetReaders wrapper since Velox native reader + // always behaves like the vectorized reader. + + private def checkAllParquetReaders( + values: Seq[String], + fromType: DataType, + toType: DataType, + expectError: Boolean): Unit = { + val timestampRebaseModes = toType match { + case _: TimestampNTZType | _: DateType => + Seq(LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.LEGACY) + case _ => + Seq(LegacyBehaviorPolicy.CORRECTED) + } + for { + dictionaryEnabled <- Seq(true, false) + timestampRebaseMode <- timestampRebaseModes + } + withClue( + s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + + s"'$timestampRebaseMode''") { + withAllParquetWriters { + withTempDir { + dir => + val expected = + writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode) + if (expectError) { + val exception = intercept[SparkException] { + readParquetFiles(dir, toType).collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause + .isInstanceOf[org.apache.parquet.io.ParquetDecodingException] || + exception.getCause.getMessage.contains("PARQUET_CONVERSION_FAILURE")) + } else { + checkAnswer(readParquetFiles(dir, toType), expected.select($"a".cast(toType))) + } + } + } + } + } + + private def readParquetFiles(dir: File, dataType: DataType): DataFrame = { + spark.read.schema(s"a ${dataType.sql}").parquet(dir.getAbsolutePath) + } + + private def writeParquetFiles( + dir: File, + values: Seq[String], + dataType: DataType, + dictionaryEnabled: Boolean, + timestampRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED) + : DataFrame = { + val repeatedValues = List.fill(if (dictionaryEnabled) 10 else 1)(values).flatten + val df = repeatedValues.toDF("a").select(col("a").cast(dataType)) + withSQLConf( + ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> timestampRebaseMode.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + + if (dictionaryEnabled && !DecimalType.isByteArrayDecimalType(dataType)) { + assertAllParquetFilesDictionaryEncoded(dir) + } + + val isParquetV2 = spark.conf + .getOption(ParquetOutputFormat.WRITER_VERSION) + .contains(ParquetProperties.WriterVersion.PARQUET_2_0.toString) + if (isParquetV2) { + if (dictionaryEnabled) { + assertParquetV2Encoding(dir, Encoding.PLAIN) + } else if (DecimalType.is64BitDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BINARY_PACKED) + } else if (DecimalType.isByteArrayDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BYTE_ARRAY) + } + } + df + } + + private def assertAllParquetFilesDictionaryEncoded(dir: File): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.hasDictionaryPage, + "This test covers dictionary encoding but column " + + s"'${col.getPath.toDotString}' in the test data is not dictionary encoded.") + } + } + } + } + + private def assertParquetV2Encoding(dir: File, expected_encoding: Encoding): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.getEncodings.contains(expected_encoding), + s"Expected column '${col.getPath.toDotString}' " + + s"to use encoding $expected_encoding " + + s"but found ${col.getEncodings}." + ) + } + } + } + } + + // ====== Override tests ====== + // Velox native reader always behaves like Spark's vectorized reader (no parquet-mr fallback). + // In the parent tests, `expectError` is conditional on PARQUET_VECTORIZED_READER_ENABLED: + // parquet-mr allows conversions that the vectorized reader rejects. + // Since Velox always rejects, we override with expectError = true. + + for { + (values: Seq[String], fromType: DataType, toType: DecimalType) <- Seq( + (Seq("1", "2"), ByteType, DecimalType(1, 0)), + (Seq("1", "2"), ByteType, ByteDecimal), + (Seq("1", "2"), ShortType, ByteDecimal), + (Seq("1", "2"), ShortType, ShortDecimal), + (Seq("1", "2"), IntegerType, ShortDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)), + (Seq("1", "2"), LongType, IntDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } + testGluten(s"unsupported parquet conversion $fromType -> $toType") { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + + for { + (fromPrecision, toPrecision) <- + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + testGluten( + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), + toType = DecimalType(toPrecision, 2), + expectError = true) + } + + for { + ((fromPrecision, fromScale), (toPrecision, toScale)) <- + // Narrowing precision and scale by the same amount. + Seq( + (7, 4) -> (5, 2), + (10, 7) -> (5, 2), + (20, 17) -> (5, 2), + (12, 4) -> (10, 2), + (20, 17) -> (10, 2), + (22, 4) -> (20, 2)) ++ + // Increasing precision and decreasing scale. + Seq((10, 6) -> (12, 4)) ++ + // Decreasing precision and increasing scale. + Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++ + // Increasing precision by a smaller amount than scale. + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7)) + } + testGluten( + s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + + s"Decimal($toPrecision, $toScale)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), + toType = DecimalType(toPrecision, toScale), + expectError = true) + } +}