Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,31 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// The columns present in the table, if not available default to the baseSchema.
auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema;

// Build dataColumns from tableSchema, excluding partition columns.
// HiveTableHandle::dataColumns() is used as fileSchema for the reader.
// Partition columns should not be validated against the file's physical types
// (their values come from the partition path, not from the file).
std::unordered_set<std::string> partitionColNames;
for (int idx = 0; idx < colNameList.size(); idx++) {
if (columnTypes[idx] == ColumnType::kPartitionKey) {
partitionColNames.insert(colNameList[idx]);
}
}
RowTypePtr dataColumns;
if (partitionColNames.empty()) {
dataColumns = tableSchema;
} else {
std::vector<std::string> dataColNames;
std::vector<TypePtr> dataColTypes;
for (int idx = 0; idx < tableSchema->size(); idx++) {
if (partitionColNames.find(tableSchema->nameOf(idx)) == partitionColNames.end()) {
dataColNames.push_back(tableSchema->nameOf(idx));
dataColTypes.push_back(tableSchema->childAt(idx));
}
}
dataColumns = ROW(std::move(dataColNames), std::move(dataColTypes));
}

connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
auto connectorId = kHiveConnectorId;
Expand All @@ -1506,7 +1531,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
common::SubfieldFilters subfieldFilters;
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
connectorId, "hive_table", std::move(subfieldFilters), remainingFilter, tableSchema);
connectorId, "hive_table", std::move(subfieldFilters), remainingFilter, dataColumns);

// Get assignments and out names.
std::vector<std::string> outNames;
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get-velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
set -exu

CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/IBM/velox.git
VELOX_BRANCH=dft-2026_03_10-iceberg
VELOX_REPO=https://github.com/baibaichen/velox.git
VELOX_BRANCH=pr3/parquet-type-widening
VELOX_ENHANCED_BRANCH=ibm-2026_03_10
VELOX_HOME=""
RUN_SETUP_SCRIPT=ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,12 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetCommitterSuite]
enableSuite[GlutenParquetFieldIdSchemaSuite]
enableSuite[GlutenParquetTypeWideningSuite]
// Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals.
.exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)")
.exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)")
.exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)")
.exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)")
// Velox native reader aligns with vectorized reader behavior, always rejecting incompatible decimal conversions.
.exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)")
.exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)")
.exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)")
Expand All @@ -338,22 +340,12 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)")
.exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)")
.exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)")
.exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(12, 4)")
.exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(20, 12)")
.exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(10, 7)")
.exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(20, 17)")
.exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(7, 4)")
.exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)")
.exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)")
.exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)")
.exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)")
.exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)")
.exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)")
.exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(12, 2)")
.exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(20, 2)")
.exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(10, 2)")
.exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(20, 2)")
.exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(7, 2)")
.exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr")
.exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)")
.exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)")
Expand All @@ -363,29 +355,14 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)")
.exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)")
.exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)")
.exclude("unsupported parquet conversion LongType -> DateType")
.exclude("unsupported parquet conversion LongType -> DecimalType(10,0)")
.exclude("unsupported parquet conversion LongType -> DecimalType(19,0)")
.exclude("unsupported parquet conversion LongType -> DecimalType(20,1)")
.exclude("unsupported parquet conversion LongType -> IntegerType")
.exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)")
.exclude("parquet widening conversion ByteType -> DecimalType(11,1)")
.exclude("parquet widening conversion ByteType -> DecimalType(20,0)")
.exclude("parquet widening conversion IntegerType -> DecimalType(11,1)")
.exclude("parquet widening conversion IntegerType -> DecimalType(20,0)")
.exclude("parquet widening conversion IntegerType -> DecimalType(38,0)")
.exclude("parquet widening conversion IntegerType -> DoubleType")
.exclude("parquet widening conversion LongType -> DecimalType(20,0)")
.exclude("parquet widening conversion LongType -> DecimalType(21,1)")
.exclude("parquet widening conversion LongType -> DecimalType(38,0)")
.exclude("parquet widening conversion ShortType -> DecimalType(11,1)")
.exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
.exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
.exclude("parquet widening conversion ShortType -> DoubleType")
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,227 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.gluten.config.GlutenConfig

class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait}
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal}

import org.apache.hadoop.fs.Path
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}

import java.io.File

class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {

import testImplicits._

// Disable native writer so that writeParquetFiles() uses Spark's Parquet writer.
// This suite tests the READ path. The native writer doesn't produce
// DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's
// V2 encoding assertions expect.
override def sparkConf: SparkConf =
super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false")

// ====== Private methods copied from ParquetTypeWideningSuite ======
// These are private in the parent class, so we must copy them to use in overridden tests.
// The key change: removed withAllParquetReaders wrapper since Velox native reader
// always behaves like the vectorized reader.

private def checkAllParquetReaders(
values: Seq[String],
fromType: DataType,
toType: DataType,
expectError: Boolean): Unit = {
val timestampRebaseModes = toType match {
case _: TimestampNTZType | _: DateType =>
Seq(LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.LEGACY)
case _ =>
Seq(LegacyBehaviorPolicy.CORRECTED)
}
for {
dictionaryEnabled <- Seq(true, false)
timestampRebaseMode <- timestampRebaseModes
}
withClue(
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
s"'$timestampRebaseMode''") {
withAllParquetWriters {
withTempDir {
dir =>
val expected =
writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode)
if (expectError) {
val exception = intercept[SparkException] {
readParquetFiles(dir, toType).collect()
}
assert(
exception.getCause
.isInstanceOf[SchemaColumnConvertNotSupportedException] ||
exception.getCause
.isInstanceOf[org.apache.parquet.io.ParquetDecodingException] ||
exception.getCause.getMessage.contains("PARQUET_CONVERSION_FAILURE"))
} else {
checkAnswer(readParquetFiles(dir, toType), expected.select($"a".cast(toType)))
}
}
}
}
}

private def readParquetFiles(dir: File, dataType: DataType): DataFrame = {
spark.read.schema(s"a ${dataType.sql}").parquet(dir.getAbsolutePath)
}

private def writeParquetFiles(
dir: File,
values: Seq[String],
dataType: DataType,
dictionaryEnabled: Boolean,
timestampRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED)
: DataFrame = {
val repeatedValues = List.fill(if (dictionaryEnabled) 10 else 1)(values).flatten
val df = repeatedValues.toDF("a").select(col("a").cast(dataType))
withSQLConf(
ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> timestampRebaseMode.toString) {
df.write.mode("overwrite").parquet(dir.getAbsolutePath)
}

if (dictionaryEnabled && !DecimalType.isByteArrayDecimalType(dataType)) {
assertAllParquetFilesDictionaryEncoded(dir)
}

val isParquetV2 = spark.conf
.getOption(ParquetOutputFormat.WRITER_VERSION)
.contains(ParquetProperties.WriterVersion.PARQUET_2_0.toString)
if (isParquetV2) {
if (dictionaryEnabled) {
assertParquetV2Encoding(dir, Encoding.PLAIN)
} else if (DecimalType.is64BitDecimalType(dataType)) {
assertParquetV2Encoding(dir, Encoding.DELTA_BINARY_PACKED)
} else if (DecimalType.isByteArrayDecimalType(dataType)) {
assertParquetV2Encoding(dir, Encoding.DELTA_BYTE_ARRAY)
}
}
df
}

private def assertAllParquetFilesDictionaryEncoded(dir: File): Unit = {
dir.listFiles(_.getName.endsWith(".parquet")).foreach {
file =>
val parquetMetadata = ParquetFileReader.readFooter(
spark.sessionState.newHadoopConf(),
new Path(dir.toString, file.getName),
ParquetMetadataConverter.NO_FILTER)
parquetMetadata.getBlocks.forEach {
block =>
block.getColumns.forEach {
col =>
assert(
col.hasDictionaryPage,
"This test covers dictionary encoding but column " +
s"'${col.getPath.toDotString}' in the test data is not dictionary encoded.")
}
}
}
}

private def assertParquetV2Encoding(dir: File, expected_encoding: Encoding): Unit = {
dir.listFiles(_.getName.endsWith(".parquet")).foreach {
file =>
val parquetMetadata = ParquetFileReader.readFooter(
spark.sessionState.newHadoopConf(),
new Path(dir.toString, file.getName),
ParquetMetadataConverter.NO_FILTER)
parquetMetadata.getBlocks.forEach {
block =>
block.getColumns.forEach {
col =>
assert(
col.getEncodings.contains(expected_encoding),
s"Expected column '${col.getPath.toDotString}' " +
s"to use encoding $expected_encoding " +
s"but found ${col.getEncodings}."
)
}
}
}
}

// ====== Override tests ======
// Velox native reader always behaves like Spark's vectorized reader (no parquet-mr fallback).
// In the parent tests, `expectError` is conditional on PARQUET_VECTORIZED_READER_ENABLED:
// parquet-mr allows conversions that the vectorized reader rejects.
// Since Velox always rejects, we override with expectError = true.

for {
(values: Seq[String], fromType: DataType, toType: DecimalType) <- Seq(
(Seq("1", "2"), ByteType, DecimalType(1, 0)),
(Seq("1", "2"), ByteType, ByteDecimal),
(Seq("1", "2"), ShortType, ByteDecimal),
(Seq("1", "2"), ShortType, ShortDecimal),
(Seq("1", "2"), IntegerType, ShortDecimal),
(Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)),
(Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)),
(Seq("1", "2"), LongType, IntDecimal),
(Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)),
(Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)),
(Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)),
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)),
(Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)),
(Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)),
(Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)),
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
)
}
testGluten(s"unsupported parquet conversion $fromType -> $toType") {
checkAllParquetReaders(values, fromType, toType, expectError = true)
}

for {
(fromPrecision, toPrecision) <-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
}
testGluten(
s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
fromType = DecimalType(fromPrecision, 2),
toType = DecimalType(toPrecision, 2),
expectError = true)
}

for {
((fromPrecision, fromScale), (toPrecision, toScale)) <-
// Narrowing precision and scale by the same amount.
Seq(
(7, 4) -> (5, 2),
(10, 7) -> (5, 2),
(20, 17) -> (5, 2),
(12, 4) -> (10, 2),
(20, 17) -> (10, 2),
(22, 4) -> (20, 2)) ++
// Increasing precision and decreasing scale.
Seq((10, 6) -> (12, 4)) ++
// Decreasing precision and increasing scale.
Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++
// Increasing precision by a smaller amount than scale.
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7))
}
testGluten(
s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
s"Decimal($toPrecision, $toScale)") {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
fromType = DecimalType(fromPrecision, fromScale),
toType = DecimalType(toPrecision, toScale),
expectError = true)
}
}
Loading
Loading