From fa4bce72fa1c023caf8d8f02677ef77741572fa1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 12:31:16 +0100 Subject: [PATCH 1/6] [SPARK-56046][SQL] Typed SPJ partition key reducers --- .../connector/catalog/functions/Reducer.java | 9 + .../plans/physical/partitioning.scala | 30 +++- .../apache/spark/sql/internal/SQLConf.scala | 11 ++ .../connector/catalog/InMemoryBaseTable.scala | 6 +- .../datasources/v2/GroupPartitionsExec.scala | 12 +- .../exchange/EnsureRequirements.scala | 64 ++++++- .../KeyGroupedPartitioningSuite.scala | 164 +++++++++++++++++- .../functions/transformFunctions.scala | 121 +++++++++++-- 8 files changed, 384 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 561d66092d641..6772b454efc7f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.connector.catalog.functions; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.DataType; /** * A 'reducer' for output of user-defined functions. @@ -39,4 +40,12 @@ @Evolving public interface Reducer { O reduce(I arg); + + /** + * Returns the {@link DataType data type} of values produced by this function. + * It can return null to signal it doesn't change the input type. + * + * @return a data type for values produced by this function. + */ + default DataType resultType() { return null; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 28a9225b6ce23..7d1c80e3f903e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -464,11 +464,15 @@ case class KeyedPartitioning( KeyedPartitioning.projectKeys(partitionKeys, expressionDataTypes, positions) /** - * Reduces this partitioning's partition keys by applying the given reducers. + * Reduces this partitioning's partition keys by applying the given reducers and use the provided + * types for comparison. * Returns the distinct reduced keys. */ - def reduceKeys(reducers: Seq[Option[Reducer[_, _]]]): Seq[InternalRowComparableWrapper] = - KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers).distinct + def reduceKeys( + reducers: Seq[Option[Reducer[_, _]]], + reducedDataTypes: Seq[DataType]): Seq[InternalRowComparableWrapper] = + KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers, reducedDataTypes) + .distinct override def satisfies0(required: Distribution): Boolean = { nonGroupedSatisfies(required) || groupedSatisfies(required) @@ -581,14 +585,28 @@ object KeyedPartitioning { } /** - * Reduces a sequence of partition keys by applying reducers to each position. + * Reduces a sequence of data types by applying reducers to each position. + */ + def reduceTypes( + dataTypes: Seq[DataType], + reducers: Seq[Option[Reducer[_, _]]]): Seq[DataType] = { + dataTypes.zip(reducers).map { + case (t, Some(reducer: Reducer[Any, Any])) => Option(reducer.resultType()).getOrElse(t) + case (t, _) => t + } + } + + /** + * Reduces a sequence of partition keys by applying reducers to each position and using the + * provided types for comparison. */ def reduceKeys( keys: Seq[InternalRowComparableWrapper], dataTypes: Seq[DataType], - reducers: Seq[Option[Reducer[_, _]]]): Seq[InternalRowComparableWrapper] = { + reducers: Seq[Option[Reducer[_, _]]], + reducedDataTypes: Seq[DataType]): Seq[InternalRowComparableWrapper] = { val comparableKeyWrapperFactory = - InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(dataTypes) + InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(reducedDataTypes) keys.map { key => val keyValues = key.row.toSeq(dataTypes) val reducedKey = keyValues.zip(reducers).map { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fbb67c6d44dcf..5cf833d4cdf9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2133,6 +2133,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = + buildConf("spark.sql.legacy.allowIncompatibleTransformTypes.enabled") + .doc("Whether to allow storage-partition join where the partition transforms produce " + + "incompatible reduced types and use the left partition key type for comparison.") + .version("4.2.0") + .booleanConf + .createWithDefault(true) + val V2_BUCKETING_PARTITION_FILTER_ENABLED = buildConf("spark.sql.sources.v2.bucketing.partition.filter.enabled") .doc(s"Whether to filter partitions when running storage-partition join. " + @@ -7692,6 +7700,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingAllowCompatibleTransforms: Boolean = getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS) + def v2BucketingAllowIncompatibleTransformTypes: Boolean = + getConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES) + def v2BucketingAllowSorting: Boolean = getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 407d592f82199..e7762565f47ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -203,10 +203,10 @@ abstract class InMemoryBaseTable( case YearsTransform(ref) => extractor(ref.fieldNames, cleanedSchema, row) match { case (days: Int, DateType) => - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days)) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days)).toInt case (micros: Long, TimestampType) => val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt case (v, t) => throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)") } @@ -225,7 +225,7 @@ abstract class InMemoryBaseTable( case (days, DateType) => days case (micros: Long, TimestampType) => - ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros)) + ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros)).toInt case (v, t) => throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index d5260cad9c6ec..2a3d7596219e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -141,15 +141,21 @@ case class GroupPartitionsExec( )(keyedPartitioning.projectKeys) // Reduce keys if reducers are specified - val reducedKeys = reducers.fold(projectedKeys)( - KeyedPartitioning.reduceKeys(projectedKeys, projectedDataTypes, _)) + val (reducedDataTypes, reducedKeys) = reducers match { + case Some(reducers) => + val reducedDataTypes = KeyedPartitioning.reduceTypes(projectedDataTypes, reducers) + val reducedKeys = KeyedPartitioning.reduceKeys(projectedKeys, projectedDataTypes, reducers, + reducedDataTypes) + (reducedDataTypes, reducedKeys) + case _ => (projectedDataTypes, projectedKeys) + } val keyToPartitionIndices = reducedKeys.zipWithIndex.groupMap(_._1)(_._2) if (expectedPartitionKeys.isDefined) { alignToExpectedKeys(keyToPartitionIndices) } else { - (groupAndSortByKeys(keyToPartitionIndices, projectedDataTypes), true) + (groupAndSortByKeys(keyToPartitionIndices, reducedDataTypes), true) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index cca37558584f0..22da36842b0c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -20,17 +20,20 @@ package org.apache.spark.sql.execution.exchange import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.internal.{LogKeys} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper import org.apache.spark.sql.connector.catalog.functions.Reducer import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.v2.GroupPartitionsExec import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType /** * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] @@ -509,11 +512,27 @@ case class EnsureRequirements( // in case of compatible but not identical partition expressions, we apply 'reduce' // transforms to group one side's partitions as well as the common partition values val leftReducers = leftSpec.reducers(rightSpec) - val leftReducedKeys = - leftReducers.fold(leftPartitioning.partitionKeys)(leftPartitioning.reduceKeys) val rightReducers = rightSpec.reducers(leftSpec) - val rightReducedKeys = - rightReducers.fold(rightPartitioning.partitionKeys)(rightPartitioning.reduceKeys) + val leftReducedDataTypes = leftReducers.fold(leftPartitioning.expressionDataTypes)( + KeyedPartitioning.reduceTypes(leftPartitioning.expressionDataTypes, _)) + val rightReducedDataTypes = rightReducers.fold(rightPartitioning.expressionDataTypes)( + KeyedPartitioning.reduceTypes(rightPartitioning.expressionDataTypes, _)) + if (leftReducedDataTypes != rightReducedDataTypes && ( + !conf.v2BucketingAllowIncompatibleTransformTypes || + leftReducedDataTypes.map(PhysicalDataType(_)) != + rightReducedDataTypes.map(PhysicalDataType(_)))) { + throw new SparkException("Storage-partition join partition transforms produced " + + s"incompatible reduced types, left: $leftReducedDataTypes, right: " + + s"$rightReducedDataTypes") + } + val commonDataTypes = leftReducedDataTypes + val leftReducedKeys = leftReducers.fold(leftPartitioning.partitionKeys)( + leftPartitioning.reduceKeys(_, commonDataTypes)) + // As we use left side reduced types as common types for comparison, the right side + // partitions keys might need a new comparable wrapper (depending on the legacy flag) + val rightReducedKeys = rightReducers.fold( + rewrapKeys(rightPartitioning.partitionKeys, rightReducedDataTypes, commonDataTypes))( + rightPartitioning.reduceKeys(_, commonDataTypes)) // merge values on both sides var mergedPartitionKeys = @@ -628,10 +647,17 @@ case class EnsureRequirements( } } + val leftMergedPartitionKeys = mergedPartitionKeys + // As we used left side reduced types as common types for comparison, the merged partition + // keys that we push doww to the right side might need a new comparable wrapper (depending + // on the legacy flag) + val rightMergedPartitionKeys = + rewrapKeyMap(mergedPartitionKeys, commonDataTypes, rightReducedDataTypes) + // Now we need to push-down the common partition information to the `GroupPartitionsExec`s. - newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, mergedPartitionKeys, + newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, leftMergedPartitionKeys, leftReducers, distributePartitions = applyPartialClustering && !replicateLeftSide) - newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, mergedPartitionKeys, + newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, rightMergedPartitionKeys, rightReducers, distributePartitions = applyPartialClustering && !replicateRightSide) } } @@ -656,6 +682,32 @@ case class EnsureRequirements( } } + private def rewrapKeys( + keys: Seq[InternalRowComparableWrapper], + currentDataTypes: Seq[DataType], + expectedDataType: Seq[DataType]) = { + if (currentDataTypes != expectedDataType) { + val comparableKeyWrapperFactory = + InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(expectedDataType) + keys.map(key => comparableKeyWrapperFactory(key.row)) + } else { + keys + } + } + + private def rewrapKeyMap( + keyMap: Seq[(InternalRowComparableWrapper, Int)], + currentDataTypes: Seq[DataType], + expectedDataType: Seq[DataType]) = { + if (currentDataTypes != expectedDataType) { + val comparableKeyWrapperFactory = + InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(expectedDataType) + keyMap.map { case (key, numParts) => (comparableKeyWrapperFactory(key.row), numParts) } + } else { + keyMap + } + } + // Similar to `OptimizeSkewedJoin.canSplitRightSide` private def canReplicateLeftSide(joinType: JoinType): Boolean = { joinType == Inner || joinType == Cross || joinType == RightOuter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index f2eb02e03846e..e26ae8d7d7035 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import java.sql.Timestamp import java.util.Collections -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} @@ -75,6 +75,20 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with Column.create("dept_id", IntegerType), Column.create("data", StringType)) + def withFunction[T](fn: UnboundFunction)(f: => T): T = { + val id = Identifier.of(Array.empty, fn.name()) + val oldFn = Option.when(catalog.listFunctions(Array.empty).contains(id)) { + val fn = catalog.loadFunction(id) + catalog.dropFunction(id) + fn + } + catalog.createFunction(id, fn) + try f finally { + catalog.dropFunction(id) + oldFn.foreach(catalog.createFunction(id, _)) + } + } + test("clustered distribution: output partitioning should be KeyedPartitioning") { val partitions: Array[Transform] = Array(Expressions.years("ts")) @@ -88,7 +102,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with var df = sql(s"SELECT count(*) FROM testcat.ns.$table GROUP BY ts") val catalystDistribution = physical.ClusteredDistribution( Seq(TransformExpression(YearsFunction, Seq(attr("ts"))))) - val partitionKeys = Seq(50L, 51L, 52L).map(v => InternalRow.fromSeq(Seq(v))) + val partitionKeys = Seq(50, 51, 52).map(v => InternalRow.fromSeq(Seq(v))) checkQueryPlan(df, catalystDistribution, physical.KeyedPartitioning(catalystDistribution.clustering, partitionKeys)) @@ -3242,4 +3256,150 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with "ExpectedPartitionKeys: 2 Reducers: 1 DistributePartitions: false") } } + + test("SPARK-56046: Reducers with same logical result types") { + val items_partitions = Array(days("arrive_time")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + + s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") + + val purchases_partitions = Array(years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(5, 44.0, cast('2020-01-15' as timestamp)), " + + s"(7, 46.5, cast('2021-02-08' as timestamp))") + + Seq(true, false).foreach { allowIncompatibleTransformTypes => + withSQLConf( + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> + allowIncompatibleTransformTypes.toString) { + Seq( + s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", + s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" + ).foreach { joinSting => + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") + val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) + assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) + + checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) + } + } + } + } + + test("SPARK-56046: Reducers with different logical but compatible physical result types") { + withFunction(UnboundDaysFunctionWithCompatiblePhysicalTypeReducer) { + val items_partitions = Array(days("arrive_time")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + + s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") + + val purchases_partitions = Array(years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(5, 44.0, cast('2020-01-15' as timestamp)), " + + s"(7, 46.5, cast('2021-02-08' as timestamp))") + + withSQLConf( + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + Seq( + s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", + s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" + ).foreach { joinSting => + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") + val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) + assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) + + checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "false") { + val e = intercept[SparkException] { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + df.collect() + } + assert(e.getMessage.startsWith( + "Storage-partition join partition transforms produced incompatible reduced types")) + } + } + } + } + } + + test("SPARK-56046: Reducers with different logical and incompatible physical result types") { + withFunction(UnboundDaysFunctionWithIncompatiblePhysicalTypeReducer) { + val items_partitions = Array(days("arrive_time")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + + s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") + + val purchases_partitions = Array(years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(5, 44.0, cast('2020-01-15' as timestamp)), " + + s"(7, 46.5, cast('2021-02-08' as timestamp))") + + Seq(true, false).foreach { allowIncompatibleTransformTypes => + withSQLConf( + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> + allowIncompatibleTransformTypes.toString) { + Seq( + s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", + s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" + ).foreach { joinSting => + val e = intercept[SparkException] { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + df.collect() + } + assert(e.getMessage.startsWith( + "Storage-partition join partition transforms produced incompatible reduced types")) + } + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index ed2f81d7e8d6f..8d592b16df004 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -40,42 +40,137 @@ object UnboundYearsFunction extends UnboundFunction { override def name(): String = "years" } -object YearsFunction extends ScalarFunction[Long] { +object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] { override def inputTypes(): Array[DataType] = Array(TimestampType) - override def resultType(): DataType = LongType + override def resultType(): DataType = IntegerType override def name(): String = "years" override def canonicalName(): String = name() val UTC: ZoneId = ZoneId.of("UTC") val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate - def invoke(ts: Long): Long = { + def invoke(ts: Long): Int = { val localDate = DateTimeUtils.microsToInstant(ts).atZone(UTC).toLocalDate - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt } + + override def reducer(otherFunction: ReducibleFunction[_, _]): Reducer[Int, Int] = null } -object DaysFunction extends BoundFunction { - override def inputTypes(): Array[DataType] = Array(TimestampType) - override def resultType(): DataType = LongType +abstract class UnboundDaysFunctionBase extends UnboundFunction { + protected def isValidType(dt: DataType): Boolean = dt match { + case DateType | TimestampType => true + case _ => false + } + + override def description(): String = name() override def name(): String = "days" - override def canonicalName(): String = name() } -object UnboundDaysFunction extends UnboundFunction { +object UnboundDaysFunction extends UnboundDaysFunctionBase { override def bind(inputType: StructType): BoundFunction = { if (inputType.size == 1 && isValidType(inputType.head.dataType)) DaysFunction else throw new UnsupportedOperationException( "'days' only take date or timestamp as input type") } +} - private def isValidType(dt: DataType): Boolean = dt match { - case DateType | TimestampType => true - case _ => false +object UnboundDaysFunctionWithCompatiblePhysicalTypeReducer extends UnboundDaysFunctionBase { + override def bind(inputType: StructType): BoundFunction = { + if (inputType.size == 1 && isValidType(inputType.head.dataType)) { + DaysFunctionWithCompatiblePhysicalTypeReducer + } else throw new UnsupportedOperationException( + "'days' only take date or timestamp as input type") } +} - override def description(): String = name() +object UnboundDaysFunctionWithIncompatiblePhysicalTypeReducer extends UnboundDaysFunctionBase { + override def bind(inputType: StructType): BoundFunction = { + if (inputType.size == 1 && isValidType(inputType.head.dataType)) { + DaysFunctionWithIncompatiblePhysicalTypeReducer + } else throw new UnsupportedOperationException( + "'days' only take date or timestamp as input type") + } +} + +abstract class DaysFunctionBase extends ScalarFunction[Int] { + override def inputTypes(): Array[DataType] = Array(TimestampType) + override def resultType(): DataType = DateType override def name(): String = "days" + override def canonicalName(): String = name() +} + +// This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when +// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. +object DaysFunction extends DaysFunctionBase with ReducibleFunction[Int, Int] { + override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { + if (otherFunc == YearsFunction) { + DaysToYearsReducer() + } else { + null + } + } +} + +// This `days` function reduces `DateType` partitions keys to `DateType` partitions keys when +// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. +// `DateType` and `IntegerType` share the same `PhysicalDataType`. +object DaysFunctionWithCompatiblePhysicalTypeReducer + extends DaysFunctionBase with ReducibleFunction[Int, Int] { + override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { + if (otherFunc == YearsFunction) { + DaysToYearsReducerWithCompatiblePhysicalType() + } else { + null + } + } +} + +// This `days` function reduces `DateType` partitions keys to `LongType` partitions keys when +// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. +// `LongType` and `IntegerType` have different `PhysicalDataType`s. +object DaysFunctionWithIncompatiblePhysicalTypeReducer + extends DaysFunctionBase with ReducibleFunction[Int, Long] { + override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Long] = { + if (otherFunc == YearsFunction) { + DaysToYearsReducerWithIncompablePhysicalType() + } else { + null + } + } +} + +abstract class DaysToYearsReducerBase { + val UTC: ZoneId = ZoneId.of("UTC") + val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate +} + +case class DaysToYearsReducer() extends DaysToYearsReducerBase with Reducer[Int, Int] { + override def reduce(days: Int): Int = { + val localDate = EPOCH_LOCAL_DATE.plusDays(days) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt + } + + override def resultType(): DataType = IntegerType +} + +// No `resultType()` override means that the reduced type is the original `DateType`. +case class DaysToYearsReducerWithCompatiblePhysicalType() + extends DaysToYearsReducerBase with Reducer[Int, Int] { + override def reduce(days: Int): Int = { + val localDate = EPOCH_LOCAL_DATE.plusDays(days) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt + } +} + +case class DaysToYearsReducerWithIncompablePhysicalType() + extends DaysToYearsReducerBase with Reducer[Int, Long] { + override def reduce(days: Int): Long = { + val localDate = EPOCH_LOCAL_DATE.plusDays(days) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) + } + + override def resultType(): DataType = LongType } object UnboundBucketFunction extends UnboundFunction { From 494b92393519631b92f00a2e55a4fb54aeebad2c Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 17:32:55 +0100 Subject: [PATCH 2/6] fix config --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5cf833d4cdf9d..11fb4b994d00d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2124,7 +2124,7 @@ object SQLConf { val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS = buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled") .doc("Whether to allow storage-partition join in the case where the partition transforms " + - "are compatible but not identical. This config requires both " + + "are compatible but not identical. This config requires both " + s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " + s"enabled and ${V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " + "to be disabled." @@ -2134,10 +2134,13 @@ object SQLConf { .createWithDefault(false) val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = - buildConf("spark.sql.legacy.allowIncompatibleTransformTypes.enabled") - .doc("Whether to allow storage-partition join where the partition transforms produce " + - "incompatible reduced types and use the left partition key type for comparison.") + buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") + .doc("Whether to allow storage-partition join where the left and right partition " + + "transforms are reduced to differing logical types and in that case use the left reduced " + + "logical types for comparison. This config requires " + + s"${V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key} to be enabled.") .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf .createWithDefault(true) From e31b3619243c6f1d383b82898aca1729d6cf350e Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 18:03:52 +0100 Subject: [PATCH 3/6] fix expected ordering type of years transform --- .../spark/sql/connector/WriteDistributionAndOrderingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 7c4852c5e22d5..588490e07dfd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -1174,7 +1174,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase Invoke( Literal.create(YearsFunction, ObjectType(YearsFunction.getClass)), "invoke", - LongType, + IntegerType, Seq(Cast(attr("day"), TimestampType, Some("America/Los_Angeles"))), Seq(TimestampType), propagateNull = false), From a00c069b55b86d00a24567b05dd378e58b709738 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 19:47:30 +0100 Subject: [PATCH 4/6] address review comments --- .../spark/sql/connector/catalog/functions/Reducer.java | 7 +++++-- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 6772b454efc7f..fa762257359ab 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -42,8 +42,11 @@ public interface Reducer { O reduce(I arg); /** - * Returns the {@link DataType data type} of values produced by this function. - * It can return null to signal it doesn't change the input type. + * Returns the {@link DataType data type} of values produced by this reducer. + * + * As a reducer doesn't know the result {@link DataType data type} of the reduced transform + * function, for compatibility reasons it can return null to signal it doesn't change the type of + * partition keys when the keys are reduced. * * @return a data type for values produced by this function. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11fb4b994d00d..cdcbb9de300f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2124,7 +2124,7 @@ object SQLConf { val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS = buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled") .doc("Whether to allow storage-partition join in the case where the partition transforms " + - "are compatible but not identical. This config requires both " + + "are compatible but not identical. This config requires both " + s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " + s"enabled and ${V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " + "to be disabled." From c20b3012bbe6a7f3aff32c219c6cb5e3b88cffa7 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 19 Mar 2026 09:19:03 +0100 Subject: [PATCH 5/6] Extract `TypedReducer` from `Reducer` --- .../connector/catalog/functions/Reducer.java | 18 +++----- .../catalog/functions/TypedReducer.java | 45 +++++++++++++++++++ .../plans/physical/partitioning.scala | 4 +- .../functions/transformFunctions.scala | 6 +-- 4 files changed, 55 insertions(+), 18 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 7ba4024af68ed..dc568cf4ccbfd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -33,8 +33,11 @@ * r1(f_source(x)) = r2(f_target(x)) for all input x. * * - * @param reducer input type - * @param reducer output type + *

If the reducer changes the logical Spark {@link DataType data type} of the values it produces, + * implement {@link TypedReducer} instead. + * + * @param the physical Java type of the input + * @param the physical Java type of the output * @since 4.0.0 */ @Evolving @@ -48,15 +51,4 @@ public interface Reducer { default String displayName() { return getClass().getSimpleName(); } - - /** - * Returns the {@link DataType data type} of values produced by this reducer. - * - * As a reducer doesn't know the result {@link DataType data type} of the reduced transform - * function, for compatibility reasons it can return null to signal it doesn't change the type of - * partition keys when the keys are reduced. - * - * @return a data type for values produced by this function. - */ - default DataType resultType() { return null; } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java new file mode 100644 index 0000000000000..89b94d17f6313 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.catalog.functions; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.DataType; + +/** + * A {@link Reducer} that changes the {@link DataType data type} of the values it produces. + * + *

Implement this interface instead of {@link Reducer} when the reducer produces values of a + * different logical Spark {@link DataType data type} than its input. If a {@link ReducibleFunction} + * returns a {@code TypedReducer} from its {@code reducer()} method, the output type is taken from + * {@link #resultType()}. If it returns a plain {@link Reducer}, the output type is assumed to be + * unchanged. + * + * @param the physical Java type of the input + * @param the physical Java type of the output + * @see Reducer + * @see ReducibleFunction + * @since 4.2.0 + */ +@Evolving +public interface TypedReducer extends Reducer { + /** + * Returns the logical Spark {@link DataType data type} of values produced by this reducer. + * + * @return the data type of values produced by this reducer. + */ + DataType resultType(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 7d1c80e3f903e..f81a55fbc771e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -24,7 +24,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper -import org.apache.spark.sql.connector.catalog.functions.Reducer +import org.apache.spark.sql.connector.catalog.functions.{Reducer, TypedReducer} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, IntegerType} @@ -591,7 +591,7 @@ object KeyedPartitioning { dataTypes: Seq[DataType], reducers: Seq[Option[Reducer[_, _]]]): Seq[DataType] = { dataTypes.zip(reducers).map { - case (t, Some(reducer: Reducer[Any, Any])) => Option(reducer.resultType()).getOrElse(t) + case (t, Some(reducer: TypedReducer[Any, Any])) => reducer.resultType() case (t, _) => t } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index 3d1b996503cc0..99f0cee2b4dec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -145,7 +145,7 @@ abstract class DaysToYearsReducerBase { val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate } -case class DaysToYearsReducer() extends DaysToYearsReducerBase with Reducer[Int, Int] { +case class DaysToYearsReducer() extends DaysToYearsReducerBase with TypedReducer[Int, Int] { override def reduce(days: Int): Int = { val localDate = EPOCH_LOCAL_DATE.plusDays(days) ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt @@ -154,7 +154,7 @@ case class DaysToYearsReducer() extends DaysToYearsReducerBase with Reducer[Int, override def resultType(): DataType = IntegerType } -// No `resultType()` override means that the reduced type is the original `DateType`. +// Not a `TypedReducer`, so the reduced type is the original `DateType`. case class DaysToYearsReducerWithCompatiblePhysicalType() extends DaysToYearsReducerBase with Reducer[Int, Int] { override def reduce(days: Int): Int = { @@ -164,7 +164,7 @@ case class DaysToYearsReducerWithCompatiblePhysicalType() } case class DaysToYearsReducerWithIncompablePhysicalType() - extends DaysToYearsReducerBase with Reducer[Int, Long] { + extends DaysToYearsReducerBase with TypedReducer[Int, Long] { override def reduce(days: Int): Long = { val localDate = EPOCH_LOCAL_DATE.plusDays(days) ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) From 595d59ebf924bc30bdf47813b2f664d1b97bad07 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 19 Mar 2026 18:38:23 +0100 Subject: [PATCH 6/6] address review comments --- .../connector/catalog/functions/Reducer.java | 1 + .../apache/spark/sql/internal/SQLConf.scala | 5 +- .../KeyGroupedPartitioningSuite.scala | 46 +++++++++---------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index dc568cf4ccbfd..9b4f6baad6a8a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -32,6 +32,7 @@ *

  • More generally, there exists reducer functions r1(x) and r2(x) such that * r1(f_source(x)) = r2(f_target(x)) for all input x.
  • * + * where = means both value and data type match. * *

    If the reducer changes the logical Spark {@link DataType data type} of the values it produces, * implement {@link TypedReducer} instead. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d7f1993e03bb8..3916b6e018858 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2145,7 +2145,8 @@ object SQLConf { .createWithDefault(false) val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = - buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") + buildConf("spark.sql.legacy.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") + .internal() .doc("Whether to allow storage-partition join where the left and right partition " + "transforms are reduced to differing logical types and in that case use the left reduced " + "logical types for comparison. This config requires " + @@ -2153,7 +2154,7 @@ object SQLConf { .version("4.2.0") .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf - .createWithDefault(true) + .createWithDefault(false) val V2_BUCKETING_PARTITION_FILTER_ENABLED = buildConf("spark.sql.sources.v2.bucketing.partition.filter.enabled") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index e5d18a33197b1..ff55537cc4842 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3329,33 +3329,33 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" ).foreach { joinSting => - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} id, item_id - |FROM $joinSting - |ORDER BY id, item_id - |""".stripMargin) + val e = intercept[SparkException] { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) - val shuffles = collectShuffles(df.queryExecution.executedPlan) - assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") - val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) - assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) + df.collect() + } + assert(e.getMessage.startsWith( + "Storage-partition join partition transforms produced incompatible reduced types")) - checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) - withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "false") { - val e = intercept[SparkException] { - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} id, item_id - |FROM $joinSting - |ORDER BY id, item_id - |""".stripMargin) + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") + val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) + assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) - df.collect() - } - assert(e.getMessage.startsWith( - "Storage-partition join partition transforms produced incompatible reduced types")) + checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) } } }