From 34135a85ff417414b785bf87d3466f3e269c28a6 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Wed, 18 Mar 2026 16:44:22 +0000 Subject: [PATCH 1/2] Reapply "[SPARK-56045][SQL] Add flag for ignoring Parquet UNKNOWN type annotation and revert to old behavior" This reverts commit fd2d262c57ae3b51c14932b6cf975c14cca86112. --- .../apache/spark/sql/internal/SQLConf.scala | 14 +++++++++++++ .../parquet/ParquetFileFormat.scala | 12 +++++++++-- .../parquet/ParquetSchemaConverter.scala | 19 ++++++++++++++---- .../datasources/v2/parquet/ParquetScan.scala | 3 +++ .../test-data/void_in_parquet.parquet | Bin 0 -> 2022 bytes .../datasources/parquet/ParquetIOSuite.scala | 18 +++++++++++++++++ 6 files changed, 60 insertions(+), 6 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/void_in_parquet.parquet diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 100149a39211f..5855ea48a2f50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1617,6 +1617,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION = + buildConf("spark.sql.parquet.reader.respectUnknownTypeAnnotation.enabled") + .internal() + .doc("When enabled, respects the UNKNOWN type annotation in Parquet files during schema " + + "inference and infers NullType. When disabled, ignores the UNKNOWN annotation " + + "and uses the physical type instead.") + .version("4.1.2") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + val PARQUET_FIELD_ID_READ_ENABLED = buildConf("spark.sql.parquet.fieldId.read.enabled") .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " + @@ -7722,6 +7733,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def parquetIgnoreVariantAnnotation: Boolean = getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION) + def parquetReaderRespectUnknownTypeAnnotation: Boolean = + getConf(SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION) + def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID) def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 08e545cb8c204..1fdb6be4b737c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -148,6 +148,9 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, sqlConf.legacyParquetNanosAsLong) + hadoopConf.setBoolean( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key, + sqlConf.parquetReaderRespectUnknownTypeAnnotation) } /** @@ -446,7 +449,9 @@ object ParquetFileFormat extends Logging { sqlConf.isParquetBinaryAsString, sqlConf.isParquetINT96AsTimestamp, inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled, - nanosAsLong = sqlConf.legacyParquetNanosAsLong) + nanosAsLong = sqlConf.legacyParquetNanosAsLong, + respectUnknownTypeAnnotation = + sqlConf.parquetReaderRespectUnknownTypeAnnotation) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -545,6 +550,8 @@ object ParquetFileFormat extends Logging { val assumeInt96IsTimestamp = sqlConf.isParquetINT96AsTimestamp val inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled val nanosAsLong = sqlConf.legacyParquetNanosAsLong + val respectUnknownTypeAnnotation = + sqlConf.parquetReaderRespectUnknownTypeAnnotation val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` @@ -552,7 +559,8 @@ object ParquetFileFormat extends Logging { assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, inferTimestampNTZ = inferTimestampNTZ, - nanosAsLong = nanosAsLong) + nanosAsLong = nanosAsLong, + respectUnknownTypeAnnotation = respectUnknownTypeAnnotation) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 9e6f4447ca792..d5b81a616d566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -60,7 +60,9 @@ class ParquetToSparkSchemaConverter( nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get, useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get, val ignoreVariantAnnotation: Boolean = - SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get) { + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get, + val respectUnknownTypeAnnotation: Boolean = + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, @@ -69,7 +71,9 @@ class ParquetToSparkSchemaConverter( inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled, nanosAsLong = conf.legacyParquetNanosAsLong, useFieldId = conf.parquetFieldIdReadEnabled, - ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation) + ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation, + respectUnknownTypeAnnotation = + conf.parquetReaderRespectUnknownTypeAnnotation) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, @@ -80,7 +84,10 @@ class ParquetToSparkSchemaConverter( useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key, SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get), ignoreVariantAnnotation = conf.getBoolean(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key, - SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get)) + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get), + respectUnknownTypeAnnotation = conf.getBoolean( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key, + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.defaultValue.get)) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -225,7 +232,11 @@ class ParquetToSparkSchemaConverter( primitiveColumn: PrimitiveColumnIO, sparkReadType: Option[DataType] = None): ParquetColumn = { val parquetType = primitiveColumn.getType.asPrimitiveType() - val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation + val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation match { + case unknown: UnknownLogicalTypeAnnotation => + if (respectUnknownTypeAnnotation) unknown else null + case other => other + } val typeName = primitiveColumn.getPrimitive def typeString = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 5a427aad5f895..d0c7859964e09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -161,6 +161,9 @@ case class ParquetScan( hadoopConf.setBoolean( SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, conf.legacyParquetNanosAsLong) + hadoopConf.setBoolean( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key, + conf.parquetReaderRespectUnknownTypeAnnotation) val broadcastedConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) diff --git a/sql/core/src/test/resources/test-data/void_in_parquet.parquet b/sql/core/src/test/resources/test-data/void_in_parquet.parquet new file mode 100644 index 0000000000000000000000000000000000000000..d0c8db6dfc8cf1932f36ea1d8fa2786c084cbe2b GIT binary patch literal 2022 zcmb7F+iv1U7&dIg(yFSW)fyQo7m>9^twd66AT6uy#Xt&}hHP-MHh5JNLiLVaO6D!l`auZm33G5`Q3<&|d{m%3-$f5NItsk&FK z*Qh;M)uu|jRh&o{E2YX`h05QBnpi9UAJNr%CKLR!79BypeV1SH=K zb}ijAG6k{Yr|W>HU7LD@l{sdl8fv6M`Ymg^kZ!qiy!PEi|KNM}ZbQtr3f404`Yjc? z?|}--jgDHR3R@c1?hOgJKzDUelD`&ve>NUdZAKmtnvNf(j|rcAX~IYf9++z61*8tP zr3Gq`;xhQD8U$W~(N<;U311W znX_@O=1QT~!+eUb$T!O={{-)#58C@5r%kb~53nX=8FJeP{H1a(U~bUJC8pGAnA0x7 z6`YQxI?}b#Bk@R?iVMJK%+5jYFtil8eXa4LuQ@uwlO%C)axu5A9%$E7$nQr`zk6NC zQss$--%oLDTN{X1n$wR!#}Vl8m5eW$K}~1V(<@_WC-*VUo$#kw{0Y5wY#h%TS?xL7 z81!MixLDU{*a)%i4K7~eZKxOQXNKvhvJ@))&bqdOtE^O6yu$lHaNREKFYF<=Stlvd zhhjV5b55If+naUP_|XSNKK&0}`Cd0iHt1dHR#snaU|YJ=PtB|@i7EF}-RU$l9g3~H zr@qFwNbScL%Xr*t-3TtcV{i1Sc8-+lyuH#eW=}{@u7!A + withSQLConf( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key -> + respectUnknown.toString + ) { + withAllParquetReaders { + // Parquet file column void_col has physical type INT32 and logical type UNKNOWN and was + // not written by Spark, so this goes through Parquet --> Spark type conversion. + val df = readResourceParquetFile("test-data/void_in_parquet.parquet") + val inferredType = df.schema("void_col").dataType + val expected = if (respectUnknown) NullType else IntegerType + assert(inferredType == expected) + } + } + } + test("vectorized reader: missing all struct fields") { for { offheapEnabled <- Seq(true, false) From 3ef0dff5b82116390643e4fc45629f11404aef68 Mon Sep 17 00:00:00 2001 From: Ziya Mukhtarov Date: Wed, 18 Mar 2026 16:44:39 +0000 Subject: [PATCH 2/2] Delete config binding policy --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5855ea48a2f50..5a1990f8ec650 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1624,7 +1624,6 @@ object SQLConf { "inference and infers NullType. When disabled, ignores the UNKNOWN annotation " + "and uses the physical type instead.") .version("4.1.2") - .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf .createWithDefault(false)