From 4ced84e77d6ce4bdf9f26166659d05fbf14d4297 Mon Sep 17 00:00:00 2001 From: vaibhawvipul Date: Sat, 4 Apr 2026 23:10:11 +0530 Subject: [PATCH 1/3] fix 3720 --- dev/diffs/3.4.3.diff | 88 +------------ dev/diffs/3.5.8.diff | 72 +---------- dev/diffs/4.0.1.diff | 74 +---------- .../spark/sql/comet/CometNativeScanExec.scala | 12 ++ .../spark/sql/comet/CometScanExec.scala | 11 ++ .../spark/sql/comet/CometScanUtils.scala | 117 ++++++++++++++++++ .../comet/shims/ShimParquetSchemaError.scala | 37 ++++++ .../comet/shims/ShimParquetSchemaError.scala | 37 ++++++ .../comet/shims/ShimParquetSchemaError.scala | 41 ++++++ .../comet/parquet/ParquetReadSuite.scala | 110 ++++++++++++++++ 10 files changed, 370 insertions(+), 229 deletions(-) create mode 100644 spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala create mode 100644 spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala create mode 100644 spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 29f49a03f3..60b2cfd45d 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2187,34 +2187,6 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index 8670d95c65e..c7ba51f770f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1064,7 +1065,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - @@ -1335,7 +1338,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2229,24 +2201,6 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index 29cb224c878..ee5a87fa200 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat - - import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} - import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow - import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 1000).map { i => - val ts = new java.sql.Timestamp(i) - Row(ts) @@ -978,7 +980,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2257,16 +2211,6 @@ index 29cb224c878..ee5a87fa200 100644 withAllParquetReaders { withTempPath { path => // Repeated values for dictionary encoding. -@@ -1031,7 +1034,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } @@ -1047,7 +1051,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema, path), df) } @@ -2287,16 +2231,6 @@ index 29cb224c878..ee5a87fa200 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() @@ -2389,7 +2323,7 @@ index bf5c51b89bb..4e2f0bdb389 100644 import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2403,26 +2337,6 @@ index bf5c51b89bb..4e2f0bdb389 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" -@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("schema mismatch failure error message for parquet vectorized reader") { -+ test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { dir => - val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[SparkException]) -@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 3a0bd35cb70..b28f06a757f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 6787d89dff..654abe9fae 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2135,26 +2135,6 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index 8ed9ef1630e..a865928c1b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - @@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2169,16 +2149,6 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index f6472ba3d9d..5ea2d938664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 1000).map { i => - val ts = new java.sql.Timestamp(i) - Row(ts) @@ -998,7 +999,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2189,16 +2159,6 @@ index f6472ba3d9d..5ea2d938664 100644 withAllParquetReaders { withTempPath { path => // Repeated values for dictionary encoding. -@@ -1051,7 +1053,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } @@ -1067,7 +1070,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema, path), df) } @@ -2219,16 +2179,6 @@ index f6472ba3d9d..5ea2d938664 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() @@ -2321,7 +2271,7 @@ index 3f47c5e506f..f1ce3194279 100644 import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2335,26 +2285,6 @@ index 3f47c5e506f..f1ce3194279 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" -@@ -1046,7 +1048,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("schema mismatch failure error message for parquet vectorized reader") { -+ test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { dir => - val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[SparkException]) -@@ -1087,7 +1090,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 15f7d8dcd8..76d83ec258 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -2921,25 +2921,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index 4474ec1fd42..05fa0257c82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.InternalRow - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2961,17 +2943,7 @@ index bba71f1c48d..0b52574e0f3 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -318,7 +318,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2994,16 +2966,6 @@ index bba71f1c48d..0b52574e0f3 100644 } } } -@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } @@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -3024,16 +2986,6 @@ index bba71f1c48d..0b52574e0f3 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -3169,7 +3121,7 @@ index 0acb21f3e6f..1f9c3fd13fc 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -3183,26 +3135,6 @@ index 0acb21f3e6f..1f9c3fd13fc 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" -@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("schema mismatch failure error message for parquet vectorized reader") { -+ test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { dir => - val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) -@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala index 09ed6955a51..6f9174c9545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index dcb975ac7a..fd322701d4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -146,6 +146,18 @@ case class CometNativeScanExec( // Get file partitions from CometScanExec (handles bucketing, etc.) val filePartitions = scan.getFilePartitions() + // Validate per-file schema compatibility before native execution. + // This must run here (not in doExecuteColumnar) because when CometNativeScanExec + // is wrapped by a parent CometNativeExec, the parent's doExecuteColumnar() runs + // the entire plan in native code and CometNativeScanExec.doExecuteColumnar() is + // never called. This lazy val IS always evaluated via commonData/perPartitionData. + CometScanUtils.validatePerFileSchemaCompatibility( + relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), + requiredSchema, + relation.partitionSchema.fieldNames.toSet, + relation.sparkSession.sessionState.conf.caseSensitiveAnalysis, + filePartitions) + // Serialize each partition's files import org.apache.comet.serde.operator.partition2Proto val perPartitionBytes = filePartitions.map { filePartition => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 9afffe20bc..f401f01a16 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -201,6 +201,17 @@ case class CometScanExec( } lazy val inputRDD: RDD[InternalRow] = { + // Validate per-file schema compatibility before reading any data. + // This must run here (not in doExecuteColumnar) because when CometScanExec is + // wrapped by a parent native operator (CometScanWrapper), the parent reads from + // inputRDD directly via JNI and doExecuteColumnar() is never called. + CometScanUtils.validatePerFileSchemaCompatibility( + relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), + requiredSchema, + relation.partitionSchema.fieldNames.toSet, + relation.sparkSession.sessionState.conf.caseSensitiveAnalysis, + getFilePartitions()) + val options = relation.options + (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString) val readFile: (PartitionedFile) => Iterator[InternalRow] = diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala index 4cd3996669..dc698dc953 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala @@ -19,7 +19,16 @@ package org.apache.spark.sql.comet +import scala.jdk.CollectionConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.Type.Repetition import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression, Literal} +import org.apache.spark.sql.comet.shims.ShimParquetSchemaError +import org.apache.spark.sql.execution.datasources.{FilePartition, SchemaColumnConvertNotSupportedException} +import org.apache.spark.sql.types._ + +import org.apache.comet.parquet.{FooterReader, TypeUtil} object CometScanUtils { @@ -30,4 +39,112 @@ object CometScanUtils { def filterUnusedDynamicPruningExpressions(predicates: Seq[Expression]): Seq[Expression] = { predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)) } + + /** + * Validate per-file schema compatibility by reading actual Parquet file metadata. + * + * For each file in the scan, reads the Parquet footer and validates each required column + * against the actual file schema. This catches mismatches that table-level schema checks miss + * (e.g., when `spark.read.schema(...)` specifies a type incompatible with the file). + * + * For primitive columns, delegates to [[TypeUtil.checkParquetType]] which mirrors Spark's + * `ParquetVectorUpdaterFactory` logic and is version-aware (Spark 4 type promotion). For + * complex types, checks kind-level mismatches (e.g., reading a scalar as an array). + */ + def validatePerFileSchemaCompatibility( + hadoopConf: Configuration, + requiredSchema: StructType, + partitionColumnNames: Set[String], + caseSensitive: Boolean, + filePartitions: Seq[FilePartition]): Unit = { + + for { + partition <- filePartitions + file <- partition.files + } { + val filePath = file.filePath.toString() + val footer = FooterReader.readFooter(hadoopConf, file) + val fileSchema = footer.getFileMetaData.getSchema + + requiredSchema.fields.foreach { field => + val fieldName = field.name + // Skip partition columns - their values come from directory paths, not the file + val isPartitionCol = if (caseSensitive) { + partitionColumnNames.contains(fieldName) + } else { + partitionColumnNames.exists(_.equalsIgnoreCase(fieldName)) + } + if (!isPartitionCol) { + val parquetFieldOpt = { + val fields = fileSchema.getFields.asScala + if (caseSensitive) fields.find(_.getName == fieldName) + else fields.find(_.getName.equalsIgnoreCase(fieldName)) + } + + parquetFieldOpt.foreach { parquetField => + field.dataType match { + case _: ArrayType => + // A REPEATED primitive/group is a valid legacy 2-level Parquet array. + // Only reject when the file has a non-repeated primitive (genuine scalar). + if (parquetField.isPrimitive && + parquetField.getRepetition != Repetition.REPEATED) { + throwSchemaMismatch( + filePath, + fieldName, + field.dataType.catalogString, + parquetField.asPrimitiveType.getPrimitiveTypeName.toString) + } + + case _: StructType | _: MapType => + // Read schema expects struct/map; file must have a group type + if (parquetField.isPrimitive) { + throwSchemaMismatch( + filePath, + fieldName, + field.dataType.catalogString, + parquetField.asPrimitiveType.getPrimitiveTypeName.toString) + } + + case _ => + if (parquetField.isPrimitive) { + // Primitive -> Primitive: use TypeUtil.checkParquetType (Spark's full logic) + try { + val descriptor = + fileSchema.getColumnDescription(Array(parquetField.getName)) + if (descriptor != null) { + TypeUtil.checkParquetType(descriptor, field.dataType) + } + } catch { + case scnse: SchemaColumnConvertNotSupportedException => + throw ShimParquetSchemaError.parquetColumnMismatchError( + filePath, + fieldName, + field.dataType.catalogString, + scnse.getPhysicalType, + scnse) + } + } else { + // File has complex type, read schema expects primitive -> mismatch + throwSchemaMismatch(filePath, fieldName, field.dataType.catalogString, "group") + } + } + } + } + } + } + } + + private def throwSchemaMismatch( + filePath: String, + column: String, + expectedType: String, + actualType: String): Unit = { + val scnse = new SchemaColumnConvertNotSupportedException(column, actualType, expectedType) + throw ShimParquetSchemaError.parquetColumnMismatchError( + filePath, + column, + expectedType, + actualType, + scnse) + } } diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala new file mode 100644 index 0000000000..cd954f3aa2 --- /dev/null +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + +object ShimParquetSchemaError { + def parquetColumnMismatchError( + filePath: String, + column: String, + expectedType: String, + actualType: String, + cause: SchemaColumnConvertNotSupportedException): SparkException = { + new SparkException( + s"Parquet column cannot be converted in file $filePath. " + + s"Column: [$column], Expected: $expectedType, Found: $actualType", + cause) + } +} diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala new file mode 100644 index 0000000000..cd954f3aa2 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + +object ShimParquetSchemaError { + def parquetColumnMismatchError( + filePath: String, + column: String, + expectedType: String, + actualType: String, + cause: SchemaColumnConvertNotSupportedException): SparkException = { + new SparkException( + s"Parquet column cannot be converted in file $filePath. " + + s"Column: [$column], Expected: $expectedType, Found: $actualType", + cause) + } +} diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala new file mode 100644 index 0000000000..43a9cc0789 --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + +object ShimParquetSchemaError { + def parquetColumnMismatchError( + filePath: String, + column: String, + expectedType: String, + actualType: String, + cause: SchemaColumnConvertNotSupportedException): SparkException = { + new SparkException( + errorClass = "FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH", + messageParameters = Map( + "path" -> filePath, + "column" -> s"[$column]", + "expectedType" -> expectedType, + "actualType" -> actualType), + cause = cause) + } +} diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 75ac889228..c9d3c2e7e0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1576,4 +1577,113 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { } } + test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", TimestampType))) + + withParquetDataFrame(data) { _ => + withTempPath { dir => + spark + .createDataFrame( + spark.sparkContext.parallelize(data.map(Row.fromTuple)), + StructType(Seq(StructField("_1", StringType)))) + .write + .parquet(dir.getCanonicalPath) + + val e = intercept[SparkException] { + spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() + } + // Verify SchemaColumnConvertNotSupportedException is somewhere in the cause chain + var cause: Throwable = e + var found = false + while (cause != null && !found) { + if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) { + found = true + } + cause = cause.getCause + } + assert( + found, + s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") + } + } + } + + test("SPARK-45604: schema mismatch on timestamp_ntz to array") { + import org.apache.spark.sql.functions.lit + withTempPath { dir => + val path = dir.getCanonicalPath + // Write a file with scalar timestamp_ntz column + val df1 = spark + .range(1) + .selectExpr("CAST(id AS INT) AS _1") + .withColumn("_2", lit("2024-01-01T00:00:00").cast(TimestampNTZType)) + // Write another file with array column + val arraySchema = StructType( + Seq(StructField("_1", IntegerType), StructField("_2", ArrayType(TimestampNTZType)))) + val df2Row = Row(2, Array(java.time.LocalDateTime.of(2024, 1, 1, 0, 0))) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq(df2Row)), arraySchema) + df1.write.mode("overwrite").parquet(s"$path/parquet") + df2.write.mode("append").parquet(s"$path/parquet") + + val e = intercept[SparkException] { + spark.read.schema(arraySchema).parquet(s"$path/parquet").collect() + } + var cause: Throwable = e + var found = false + while (cause != null && !found) { + if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) { + found = true + } + cause = cause.getCause + } + assert(found, s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") + } + } + + test("schema mismatch: Comet should match Spark behavior for incompatible type reads") { + // Spark 4 is more permissive than Spark 3 for some of these, so we verify Comet + // matches Spark rather than asserting a specific outcome. + val cases: Seq[(DataType, DataType, String)] = Seq( + (IntegerType, StringType, "int-as-string"), + (StringType, IntegerType, "string-as-int"), + (BooleanType, IntegerType, "boolean-as-int"), + (IntegerType, TimestampType, "int-as-timestamp"), + (DoubleType, IntegerType, "double-as-int")) + + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { + scanMode => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + cases.foreach { case (writeType, readType, desc) => + withTempPath { path => + val writeSchema = StructType(Seq(StructField("col", writeType, true))) + val rows = (0 until 10).map { i => + val v: Any = writeType match { + case IntegerType => i + case StringType => s"str_$i" + case BooleanType => i % 2 == 0 + case DoubleType => i.toDouble + } + Row(v) + } + spark + .createDataFrame(spark.sparkContext.parallelize(rows), writeSchema) + .write + .parquet(path.getCanonicalPath) + + val readSchema = StructType(Seq(StructField("col", readType, true))) + readParquetFile(path.getCanonicalPath, Some(readSchema)) { df => + val (sparkError, cometError) = checkSparkAnswerMaybeThrows(df) + assert( + sparkError.isDefined == cometError.isDefined, + s"[$scanMode] $desc: Spark " + + s"${if (sparkError.isDefined) "errored" else "succeeded"}" + + s" but Comet ${if (cometError.isDefined) "errored" else "succeeded"}") + } + } + } + } + } + } + } From 89b5c4c73656e2ea4897238e8781fb74f9559615 Mon Sep 17 00:00:00 2001 From: vaibhawvipul Date: Sun, 5 Apr 2026 14:33:07 +0530 Subject: [PATCH 2/3] fix failing tests for spark 4.0 --- .../org/apache/comet/parquet/TypeUtil.java | 3 +- .../apache/comet/rules/CometScanRule.scala | 32 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index 87cecdc65d..e1502ff076 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -130,7 +130,8 @@ public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkT PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); + boolean allowTypePromotion = + isSpark40Plus() || (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); if (sparkType instanceof NullType) { return; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index dc0e7099c9..9b92fa304d 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -28,13 +28,14 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues -import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} +import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometScanUtils} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec} -import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan @@ -166,13 +167,38 @@ case class CometScanRule(session: SparkSession) return scanExec } - COMET_NATIVE_SCAN_IMPL.get() match { + val result = COMET_NATIVE_SCAN_IMPL.get() match { case SCAN_AUTO | SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } + // Validate per-file schema compatibility regardless of which scan path was chosen. + // When the scan falls back to FileSourceScanExec (e.g., because native_datafusion + // doesn't support the query), DataFusion's Parquet reader may silently coerce + // incompatible types. This check reads Parquet footers to catch mismatches early. + // Use the relation's file listing to get all files across all partitions. + val allFiles = r.location.listFiles(Seq.empty, Seq.empty).flatMap { partDir => + partDir.files.map { fs => + org.apache.spark.sql.execution.datasources.PartitionedFile( + InternalRow.empty, + org.apache.spark.paths.SparkPath.fromPathString(fs.getPath.toString), + 0, + fs.getLen) + } + } + if (allFiles.nonEmpty) { + CometScanUtils.validatePerFileSchemaCompatibility( + hadoopConf, + scanExec.requiredSchema, + r.partitionSchema.fieldNames.toSet, + r.sparkSession.sessionState.conf.caseSensitiveAnalysis, + Seq(FilePartition(0, allFiles.toArray))) + } + + result + case _ => withInfo(scanExec, s"Unsupported relation ${scanExec.relation}") } From d766a28fccb9521e95c7d1a014885e043b0f7157 Mon Sep 17 00:00:00 2001 From: vaibhawvipul Date: Mon, 6 Apr 2026 15:55:16 +0530 Subject: [PATCH 3/3] fix: skip TypeUtil schema validation for unsupported types (intervals, collated strings) --- .../spark/sql/comet/CometScanUtils.scala | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala index dc698dc953..aa7d656949 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala @@ -107,7 +107,12 @@ object CometScanUtils { case _ => if (parquetField.isPrimitive) { - // Primitive -> Primitive: use TypeUtil.checkParquetType (Spark's full logic) + // Primitive -> Primitive: use TypeUtil.checkParquetType which mirrors + // Spark's ParquetVectorUpdaterFactory logic. + // TypeUtil may not cover all Spark types (e.g., intervals, collated + // strings). Only rethrow for types TypeUtil explicitly knows about + // (basic primitives, decimals, timestamps). For unknown types, let + // the execution path handle errors with proper context. try { val descriptor = fileSchema.getColumnDescription(Array(parquetField.getName)) @@ -115,13 +120,16 @@ object CometScanUtils { TypeUtil.checkParquetType(descriptor, field.dataType) } } catch { - case scnse: SchemaColumnConvertNotSupportedException => + case scnse: SchemaColumnConvertNotSupportedException + if isTypeUtilKnownType(field.dataType) => throw ShimParquetSchemaError.parquetColumnMismatchError( filePath, fieldName, field.dataType.catalogString, scnse.getPhysicalType, scnse) + case _: SchemaColumnConvertNotSupportedException => + // TypeUtil doesn't know this type - skip, let execution handle it } } else { // File has complex type, read schema expects primitive -> mismatch @@ -134,6 +142,20 @@ object CometScanUtils { } } + /** + * Returns true if TypeUtil.checkParquetType covers this Spark type. For types it doesn't know + * (intervals, collated strings, etc.), we skip rethrow and let the execution path handle + * errors. + */ + private def isTypeUtilKnownType(dt: DataType): Boolean = dt match { + case BooleanType | ByteType | ShortType | IntegerType | LongType => true + case FloatType | DoubleType => true + case StringType | BinaryType => true + case DateType | TimestampType => true + case _: DecimalType => true + case _ => false + } + private def throwSchemaMismatch( filePath: String, column: String,