diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 100149a39211f..5a1990f8ec650 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1617,6 +1617,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION = + buildConf("spark.sql.parquet.reader.respectUnknownTypeAnnotation.enabled") + .internal() + .doc("When enabled, respects the UNKNOWN type annotation in Parquet files during schema " + + "inference and infers NullType. When disabled, ignores the UNKNOWN annotation " + + "and uses the physical type instead.") + .version("4.1.2") + .booleanConf + .createWithDefault(false) + val PARQUET_FIELD_ID_READ_ENABLED = buildConf("spark.sql.parquet.fieldId.read.enabled") .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " + @@ -7722,6 +7732,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def parquetIgnoreVariantAnnotation: Boolean = getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION) + def parquetReaderRespectUnknownTypeAnnotation: Boolean = + getConf(SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION) + def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID) def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 08e545cb8c204..1fdb6be4b737c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -148,6 +148,9 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, sqlConf.legacyParquetNanosAsLong) + hadoopConf.setBoolean( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key, + sqlConf.parquetReaderRespectUnknownTypeAnnotation) } /** @@ -446,7 +449,9 @@ object ParquetFileFormat extends Logging { sqlConf.isParquetBinaryAsString, sqlConf.isParquetINT96AsTimestamp, inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled, - nanosAsLong = sqlConf.legacyParquetNanosAsLong) + nanosAsLong = sqlConf.legacyParquetNanosAsLong, + respectUnknownTypeAnnotation = + sqlConf.parquetReaderRespectUnknownTypeAnnotation) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -545,6 +550,8 @@ object ParquetFileFormat extends Logging { val assumeInt96IsTimestamp = sqlConf.isParquetINT96AsTimestamp val inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled val nanosAsLong = sqlConf.legacyParquetNanosAsLong + val respectUnknownTypeAnnotation = + sqlConf.parquetReaderRespectUnknownTypeAnnotation val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` @@ -552,7 +559,8 @@ object ParquetFileFormat extends Logging { assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, inferTimestampNTZ = inferTimestampNTZ, - nanosAsLong = nanosAsLong) + nanosAsLong = nanosAsLong, + respectUnknownTypeAnnotation = respectUnknownTypeAnnotation) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 9e6f4447ca792..d5b81a616d566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -60,7 +60,9 @@ class ParquetToSparkSchemaConverter( nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get, useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get, val ignoreVariantAnnotation: Boolean = - SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get) { + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get, + val respectUnknownTypeAnnotation: Boolean = + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, @@ -69,7 +71,9 @@ class ParquetToSparkSchemaConverter( inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled, nanosAsLong = conf.legacyParquetNanosAsLong, useFieldId = conf.parquetFieldIdReadEnabled, - ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation) + ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation, + respectUnknownTypeAnnotation = + conf.parquetReaderRespectUnknownTypeAnnotation) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, @@ -80,7 +84,10 @@ class ParquetToSparkSchemaConverter( useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key, SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get), ignoreVariantAnnotation = conf.getBoolean(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key, - SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get)) + SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get), + respectUnknownTypeAnnotation = conf.getBoolean( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key, + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.defaultValue.get)) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -225,7 +232,11 @@ class ParquetToSparkSchemaConverter( primitiveColumn: PrimitiveColumnIO, sparkReadType: Option[DataType] = None): ParquetColumn = { val parquetType = primitiveColumn.getType.asPrimitiveType() - val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation + val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation match { + case unknown: UnknownLogicalTypeAnnotation => + if (respectUnknownTypeAnnotation) unknown else null + case other => other + } val typeName = primitiveColumn.getPrimitive def typeString = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 5a427aad5f895..d0c7859964e09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -161,6 +161,9 @@ case class ParquetScan( hadoopConf.setBoolean( SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, conf.legacyParquetNanosAsLong) + hadoopConf.setBoolean( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key, + conf.parquetReaderRespectUnknownTypeAnnotation) val broadcastedConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) diff --git a/sql/core/src/test/resources/test-data/void_in_parquet.parquet b/sql/core/src/test/resources/test-data/void_in_parquet.parquet new file mode 100644 index 0000000000000..d0c8db6dfc8cf Binary files /dev/null and b/sql/core/src/test/resources/test-data/void_in_parquet.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 3072657a0954f..c0aa3dd22d44e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -765,6 +765,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + gridTest("Read external file with UNKNOWN type annotation")( + Seq(true, false) + ) { respectUnknown => + withSQLConf( + SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key -> + respectUnknown.toString + ) { + withAllParquetReaders { + // Parquet file column void_col has physical type INT32 and logical type UNKNOWN and was + // not written by Spark, so this goes through Parquet --> Spark type conversion. + val df = readResourceParquetFile("test-data/void_in_parquet.parquet") + val inferredType = df.schema("void_col").dataType + val expected = if (respectUnknown) NullType else IntegerType + assert(inferredType == expected) + } + } + } + test("vectorized reader: missing all struct fields") { for { offheapEnabled <- Seq(true, false)