Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,42 @@ class FunctionResolution(
None
}

private def resolveInternalFunction(
name: String, arguments: Seq[Expression]): Expression = {
val qualified = FunctionIdentifier(
name, Some(CatalogManager.SESSION_NAMESPACE), Some(CatalogManager.SYSTEM_CATALOG_NAME))
if (FunctionRegistry.internal.functionExists(qualified)) {
FunctionRegistry.internal.lookupFunction(qualified, arguments)
} else {
FunctionRegistry.internal.lookupFunction(FunctionIdentifier(name), arguments)
}
}

def resolveBuiltinOrTempFunction(
name: Seq[String],
arguments: Seq[Expression],
u: UnresolvedFunction): Option[Expression] = {
val expression = if (name.length == 1 && u.isInternal) {
Option(resolveInternalFunction(name.head, arguments))
} else if (name.length == 1) {
v1SessionCatalog.resolveBuiltinOrTempFunction(name.head, arguments)
} else {
None
}
expression.map { func =>
validateFunction(func, arguments.length, u)
}
}

def resolveTableValuedFunction(u: UnresolvedTableValuedFunction): LogicalPlan = {
resolveTableFunction(u.name, u.functionArgs)
.getOrElse {
throw new NoSuchFunctionException(
db = u.name.dropRight(1).mkString("."),
func = u.name.last)
}
}

/**
* Check if the arguments of a function are either resolved or a lambda function.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}

/**
* The [[ExpressionResolutionContext]] is a state that is propagated between the nodes of the
Expand Down Expand Up @@ -45,6 +45,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression
* [[ExpressionResolutionContext]] has LCA in its subtree.
* @param hasWindowExpressions A flag that highlights that a specific node corresponding to
* [[ExpressionResolutionContext]] has [[WindowExpression]]s in its subtree.
* @param hasGeneratorExpressions A flag that highlights that a specific node corresponding to
* [[ExpressionResolutionContext]] has [[Generator]] expressions in its subtree.
* @param shouldPreserveAlias A flag indicating whether we preserve the [[Alias]] e.g. if it is on
* top of a [[Project.projectList]]. If it is `false`, extra [[Alias]]es have to be stripped
* away.
Expand Down Expand Up @@ -84,6 +86,15 @@ import org.apache.spark.sql.catalyst.expressions.Expression
* @param resolvingUnresolvedAlias A flag indicating whether we are resolving a tree under an
* [[UnresolvedAlias]]. This is needed in order to prevent alias collapsing before the name of
* [[UnresolvedAlias]] above is computed.
* @param resolvingPivotAggregates A flag indicating whether we are resolving a tree under the
* [[Pivot.aggregates]]. This need for validation of those expressions.
* @param hasGroupingAnalyticsExpression A flag indicating whether a specific node corresponding to
* [[ExpressionResolutionContext]] has grouping expressions ([[Grouping]] or [[GroupingID]]) in
* its subtree.
* @param extractValueExtractionKey Extraction key for [[UnresolvedExtractValue]] if we are
* currently resolving one, None otherwise.
* @param lambdaVariableMap A map of lambda variable names to their corresponding
* [[NamedExpression]]s used to resolve [[UnresolvedLambdaVariable]]s inside lambda functions.
*/
class ExpressionResolutionContext(
val parentContext: Option[ExpressionResolutionContext] = None,
Expand All @@ -103,7 +114,11 @@ class ExpressionResolutionContext(
var hasCorrelatedScalarSubqueryExpressions: Boolean = false,
var resolvingTreeUnderAggregateExpression: Boolean = false,
var resolvingCreateNamedStruct: Boolean = false,
var resolvingUnresolvedAlias: Boolean = false) {
var resolvingUnresolvedAlias: Boolean = false,
var resolvingPivotAggregates: Boolean = false,
var hasGroupingAnalyticsExpression: Boolean = false,
var extractValueExtractionKey: Option[Expression] = None,
var lambdaVariableMap: Option[IdentifierMap[NamedExpression]] = None) {

/**
* Propagate generic information that is valid across the whole expression tree from the
Expand All @@ -116,6 +131,7 @@ class ExpressionResolutionContext(
hasAttributeOutsideOfAggregateExpressions |= child.hasAttributeOutsideOfAggregateExpressions
hasLateralColumnAlias |= child.hasLateralColumnAlias
hasWindowExpressions |= child.hasWindowExpressions
hasGroupingAnalyticsExpression |= child.hasGroupingAnalyticsExpression
hasCorrelatedScalarSubqueryExpressions |= child.hasCorrelatedScalarSubqueryExpressions
}
}
Expand All @@ -135,7 +151,9 @@ object ExpressionResolutionContext {
resolvingWindowFunction = parent.resolvingWindowFunction,
windowFunctionNestednessLevel = parent.windowFunctionNestednessLevel,
resolvingWindowSpec = parent.resolvingWindowSpec,
resolvingUnresolvedAlias = parent.resolvingUnresolvedAlias
resolvingUnresolvedAlias = parent.resolvingUnresolvedAlias,
resolvingPivotAggregates = parent.resolvingPivotAggregates,
lambdaVariableMap = parent.lambdaVariableMap
)
} else {
new ExpressionResolutionContext(
Expand All @@ -147,7 +165,9 @@ object ExpressionResolutionContext {
windowFunctionNestednessLevel = parent.windowFunctionNestednessLevel,
resolvingWindowSpec = parent.resolvingWindowSpec,
resolvingCreateNamedStruct = parent.resolvingCreateNamedStruct,
resolvingUnresolvedAlias = parent.resolvingUnresolvedAlias
resolvingUnresolvedAlias = parent.resolvingUnresolvedAlias,
resolvingPivotAggregates = parent.resolvingPivotAggregates,
lambdaVariableMap = parent.lambdaVariableMap
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{
Aggregate,
Filter,
LogicalPlan,
Pivot,
Project,
Sort
}
Expand Down Expand Up @@ -133,7 +134,6 @@ class ExpressionResolver(
private val scopes = resolver.getNameScopes
private val subqueryRegistry = resolver.getSubqueryRegistry
private val operatorResolutionContextStack = resolver.getOperatorResolutionContextStack

private val aliasResolver = new AliasResolver(this)
private val timezoneAwareExpressionResolver = new TimezoneAwareExpressionResolver(this)
private val binaryArithmeticResolver = new BinaryArithmeticResolver(this)
Expand Down Expand Up @@ -221,6 +221,53 @@ class ExpressionResolver(
resolvedExpression
}

/**
* Resolve [[Pivot.aggregates]] expressions. This method resolves each expression using
* [[resolveExpressionTreeInOperatorImpl]]. We set the `resolvingPivotAggregates` flag to true
* to indicate that we are resolving pivot aggregates.
*/
def resolvePivotAggregates(pivot: Pivot): Seq[Expression] = {
pivot.aggregates.map { expression =>
val (resolved, _) = resolveExpressionTreeInOperatorImpl(
unresolvedExpression = expression,
parentOperator = pivot,
resolvingPivotAggregates = true
)
resolved
}
}

/**
* Resolve [[Unpivot.values]] or [[Unpivot.ids]] expressions. This method first expands [[Star]]
* expressions, then resolves each expression using [[resolveExpressionTreeInOperatorImpl]].
* We set the `shouldPreserveAlias` flag to true since both [[Unpivot.values]] and
* [[Unpivot.ids]] are sequences of [[NamedExpression]]s.
*/
def resolveUnpivotArguments(
arguments: Seq[Expression],
unpivot: LogicalPlan): Seq[NamedExpression] = {
val argumentsWithStarsExpanded = traversals.withNewTraversal(unpivot) {
expandStarExpressions(arguments)
}

argumentsWithStarsExpanded.map { argument =>
val (resolvedExpression, _) = resolveExpressionTreeInOperatorImpl(
parentOperator = unpivot,
unresolvedExpression = argument,
shouldPreserveAlias = true
)
resolvedExpression.asInstanceOf[NamedExpression]
}
}

/**
* Expand [[Star]] expressions in the given sequence of expressions.
*/
def expandStarExpressions(expressions: Seq[Expression]): Seq[Expression] = expressions.flatMap {
case star: Star => expandStar(star)
case other => Seq(other)
}

/**
* This method is an expression analysis entry point. The method first checks if the expression
* has already been resolved (necessary because of partially-unresolved subtrees, see
Expand Down Expand Up @@ -603,7 +650,8 @@ class ExpressionResolver(
unresolvedExpression: Expression,
parentOperator: LogicalPlan,
shouldPreserveAlias: Boolean = false,
resolvingGroupingExpressions: Boolean = false
resolvingGroupingExpressions: Boolean = false,
resolvingPivotAggregates: Boolean = false
): (Expression, ExpressionResolutionContext) = {
traversals.withNewTraversal(
parentOperator = parentOperator,
Expand All @@ -615,7 +663,8 @@ class ExpressionResolver(
new ExpressionResolutionContext(
isRoot = true,
shouldPreserveAlias = shouldPreserveAlias,
resolvingGroupingExpressions = resolvingGroupingExpressions
resolvingGroupingExpressions = resolvingGroupingExpressions,
resolvingPivotAggregates = resolvingPivotAggregates
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExtractValue}

/**
* Resolver for [[UnresolvedExtractValue]]. Resolve the [[UnresolvedExtractValue]] by resolving its
* extraction key and child.
*
* [[UnresolvedExtractValue]] is constructed in parser, when indexing an
* array or a map with a key. For example, in the following query:
*
* {{{
* SELECT col1.a, col2[0], col3['a']
* FROM VALUES (named_struct('a', 1, 'b', 2), array(1,2,3), map('a', 1, 'b', 2));
* }}}
*
* - `col1.a` is parsed as [[UnresolvedAttribute]]
* - `col2[0]` is parsed as [[UnresolvedExtractValue]] with `col2` as child and `0` as extraction
* key.
* - `col3['a']` is parsed as [[UnresolvedExtractValue]] with `col3` as child and `'a'` as
* extraction key.
*/
class ExtractValueResolver(expressionResolver: ExpressionResolver)
extends TreeNodeResolver[UnresolvedExtractValue, Expression]
with CoercesExpressionTypes {
private val traversals = expressionResolver.getExpressionTreeTraversals
private val expressionResolutionContextStack =
expressionResolver.getExpressionResolutionContextStack

/**
* Resolves [[UnresolvedExtractValue]] by first resolving its extraction key and then its child.
* After resolving extraction key, we put the resolved key in current resolution context in order
* to allow [[NameScope]] to resolve attributes that are inside the [[ExtractValue]] expression.
*
* Handle the resolved [[ExtractValue]] by type coercing it and collecting it for window
* resolution, if needed.
*/
def resolve(unresolvedExtractValue: UnresolvedExtractValue): Expression = {
val resolvedExtractionKey = expressionResolver.resolve(unresolvedExtractValue.extraction)

expressionResolutionContextStack.peek().extractValueExtractionKey = Some(resolvedExtractionKey)
val resolvedChild = try {
expressionResolver.resolve(unresolvedExtractValue.child)
} finally {
expressionResolutionContextStack.peek().extractValueExtractionKey = None
}

val resolvedExtractValue = ExtractValue.apply(
child = resolvedChild,
extraction = resolvedExtractionKey,
resolver = conf.resolver
)

resolvedExtractValue match {
case extractValue: ExtractValue => handleResolvedExtractValue(extractValue)
case other => other
}
}

/**
* Coerces recursive types ([[ExtractValue]] expressions) in a bottom up manner and collects
* attribute references required for window resolution. For example:
*
* {{{
* CREATE OR REPLACE TABLE t(col MAP<BIGINT, DOUBLE>);
* SELECT col.field FROM t;
* }}}
*
* In this example we need to cast inner field from `String` to `BIGINT`, thus analyzed plan
* should look like:
*
* {{{
* Project [col#x[cast(field as bigint)] AS field#x]
* +- SubqueryAlias spark_catalog.default.t
* +- Relation spark_catalog.default.t[col#x] parquet
* }}}
*
* This is needed to stay compatible with the fixed-point implementation.
*/
def handleResolvedExtractValue(extractValue: ExtractValue): Expression = {
extractValue.transformUp {
case attributeReference: AttributeReference =>
coerceExpressionTypes(attributeReference, traversals.current)
case field =>
coerceExpressionTypes(field, traversals.current)
}
}
}
Loading