Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION =
buildConf("spark.sql.parquet.reader.respectUnknownTypeAnnotation.enabled")
.internal()
.doc("When enabled, respects the UNKNOWN type annotation in Parquet files during schema " +
"inference and infers NullType. When disabled, ignores the UNKNOWN annotation " +
"and uses the physical type instead.")
.version("4.1.2")
.booleanConf
.createWithDefault(false)

val PARQUET_FIELD_ID_READ_ENABLED =
buildConf("spark.sql.parquet.fieldId.read.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
Expand Down Expand Up @@ -7722,6 +7732,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def parquetIgnoreVariantAnnotation: Boolean = getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)

def parquetReaderRespectUnknownTypeAnnotation: Boolean =
getConf(SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION)

def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)

def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sqlConf.legacyParquetNanosAsLong)
hadoopConf.setBoolean(
SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key,
sqlConf.parquetReaderRespectUnknownTypeAnnotation)
}

/**
Expand Down Expand Up @@ -446,7 +449,9 @@ object ParquetFileFormat extends Logging {
sqlConf.isParquetBinaryAsString,
sqlConf.isParquetINT96AsTimestamp,
inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled,
nanosAsLong = sqlConf.legacyParquetNanosAsLong)
nanosAsLong = sqlConf.legacyParquetNanosAsLong,
respectUnknownTypeAnnotation =
sqlConf.parquetReaderRespectUnknownTypeAnnotation)

val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
Expand Down Expand Up @@ -545,14 +550,17 @@ object ParquetFileFormat extends Logging {
val assumeInt96IsTimestamp = sqlConf.isParquetINT96AsTimestamp
val inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled
val nanosAsLong = sqlConf.legacyParquetNanosAsLong
val respectUnknownTypeAnnotation =
sqlConf.parquetReaderRespectUnknownTypeAnnotation

val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
inferTimestampNTZ = inferTimestampNTZ,
nanosAsLong = nanosAsLong)
nanosAsLong = nanosAsLong,
respectUnknownTypeAnnotation = respectUnknownTypeAnnotation)

readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ class ParquetToSparkSchemaConverter(
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get,
useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get,
val ignoreVariantAnnotation: Boolean =
SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get) {
SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get,
val respectUnknownTypeAnnotation: Boolean =
SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.defaultValue.get) {

def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
Expand All @@ -69,7 +71,9 @@ class ParquetToSparkSchemaConverter(
inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
nanosAsLong = conf.legacyParquetNanosAsLong,
useFieldId = conf.parquetFieldIdReadEnabled,
ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation)
ignoreVariantAnnotation = conf.parquetIgnoreVariantAnnotation,
respectUnknownTypeAnnotation =
conf.parquetReaderRespectUnknownTypeAnnotation)

def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
Expand All @@ -80,7 +84,10 @@ class ParquetToSparkSchemaConverter(
useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key,
SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get),
ignoreVariantAnnotation = conf.getBoolean(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.key,
SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get))
SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION.defaultValue.get),
respectUnknownTypeAnnotation = conf.getBoolean(
SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key,
SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.defaultValue.get))

/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
Expand Down Expand Up @@ -225,7 +232,11 @@ class ParquetToSparkSchemaConverter(
primitiveColumn: PrimitiveColumnIO,
sparkReadType: Option[DataType] = None): ParquetColumn = {
val parquetType = primitiveColumn.getType.asPrimitiveType()
val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation
val typeAnnotation = primitiveColumn.getType.getLogicalTypeAnnotation match {
case unknown: UnknownLogicalTypeAnnotation =>
if (respectUnknownTypeAnnotation) unknown else null
case other => other
}
val typeName = primitiveColumn.getPrimitive

def typeString =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ case class ParquetScan(
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
conf.legacyParquetNanosAsLong)
hadoopConf.setBoolean(
SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key,
conf.parquetReaderRespectUnknownTypeAnnotation)

val broadcastedConf =
SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf)
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

gridTest("Read external file with UNKNOWN type annotation")(
Seq(true, false)
) { respectUnknown =>
withSQLConf(
SQLConf.PARQUET_READER_RESPECT_UNKNOWN_TYPE_ANNOTATION.key ->
respectUnknown.toString
) {
withAllParquetReaders {
// Parquet file column void_col has physical type INT32 and logical type UNKNOWN and was
// not written by Spark, so this goes through Parquet --> Spark type conversion.
val df = readResourceParquetFile("test-data/void_in_parquet.parquet")
val inferredType = df.schema("void_col").dataType
val expected = if (respectUnknown) NullType else IntegerType
assert(inferredType == expected)
}
}
}

test("vectorized reader: missing all struct fields") {
for {
offheapEnabled <- Seq(true, false)
Expand Down