Skip to content

Commit 0cad1cd

Browse files
ksbeyercloud-fan
authored andcommitted
[SPARK-53707] Improve attribute metadata handling
### What changes were proposed in this pull request? Improve how metadata is propagated during analysis and rewrites. - applyColumnMetadata: tried to avoid an Alias but the caused incorrect propagation. - collapseAlias and trimNonTopLevelAliases: now preserves the meaning of stacked aliases. - removeRedundantAlias: will remove a redundant alias when the only extra key is AUTO_GENERATED_ALIAS. - other minor cleanup This PR improves metadata propagation but it doesn't solve all the problems. A thorough review is recommended. Also recommended is to make and Alias unresolved if its inferring metadata and once its child is resolved, the ineferred metadata should be made explicit. Then the metadata would be significantly more stable. ### Why are the changes needed? Wrong metadata is inferred for DML and probably other places. ### Does this PR introduce _any_ user-facing change? Changing metadata can change DML behavior. Not removing aliases can cause some Project operators to remain. ### How was this patch tested? Tests were added with the previous PR on this topic [SPARK-52772]. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52449 from ksbeyer/SPARK-53707-better-metadata. Authored-by: ksbeyer <168490557+ksbeyer@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 470d622 commit 0cad1cd

File tree

17 files changed

+243
-118
lines changed

17 files changed

+243
-118
lines changed

sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,24 @@ sealed class Metadata private[types] (private[types] val map: Map[String, Any])
9090
/** Gets a Metadata array. */
9191
def getMetadataArray(key: String): Array[Metadata] = get(key)
9292

93+
/** Return a copy with the keys removed */
94+
def withKeysRemoved(keysToRemove: Seq[String]): Metadata = {
95+
if (keysToRemove.isEmpty) {
96+
this
97+
} else {
98+
new Metadata(this.map -- keysToRemove)
99+
}
100+
}
101+
102+
/** Return a copy with a key removed */
103+
def withKeyRemoved(keyToRemove: String): Metadata = {
104+
if (map.contains(keyToRemove)) {
105+
new Metadata(map - keyToRemove)
106+
} else {
107+
this
108+
}
109+
}
110+
93111
/** Converts to its JSON representation. */
94112
def json: String = compact(render(jsonValue))
95113

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ object TableOutputResolver extends SQLConfHelper with Logging {
133133
val canWriteExpr = canWrite(
134134
tableName, valueType, colType, byName = true, conf, addError, colPath)
135135
if (canWriteExpr) {
136-
applyColumnMetadata(checkNullability(value, col, conf, colPath), col)
136+
val nullsHandled = checkNullability(value, col, conf, colPath)
137+
applyColumnMetadata(nullsHandled, col)
137138
} else {
138139
value
139140
}
@@ -222,12 +223,29 @@ object TableOutputResolver extends SQLConfHelper with Logging {
222223
val requiredMetadata = CharVarcharUtils.cleanMetadata(column.metadata)
223224

224225
// Make sure that the result has the requiredMetadata and only that.
225-
// If the expr is an Attribute or NamedLambdaVariable with the proper name and metadata,
226-
// it should remain stable, but we do not trust that other NamedAttributes will
227-
// remain stable (namely Alias).
226+
//
227+
// If the expr is a NamedLambdaVariable, it must be from our handling of structured
228+
// array or map fields; the Alias will be added on the outer structured value.
229+
//
230+
// Even an Attribute with the proper name and metadata is not enough to prevent
231+
// source query metadata leaking to the Write after rewrites, ie:
232+
// case a: Attribute if a.name == column.name && a.metadata == requiredMetadata => a
233+
//
234+
// The problem is that an Attribute can be replaced by what it refers to, for example:
235+
// Project AttrRef(metadata={}, exprId=2)
236+
// Project Alias(
237+
// cast(AttrRef(metadata={source_field_default_value}, exprId=1) as same_type),
238+
// exprId=2,
239+
// explicitMetadata=None) -- metadata.isEmpty
240+
// gets rewritten to:
241+
// Project Alias(
242+
// AttrRef(metadata={source_field_default_value}, exprId=1),
243+
// exprId=2,
244+
// explicitMetadata=None) -- metadata.nonEmpty !!
245+
//
246+
// So we always add an Alias(expr, name, explicitMetadata = Some(requiredMetadata))
247+
// to prevent expr from leaking the source query metadata into the Write.
228248
expr match {
229-
case a: Attribute if a.name == column.name && a.metadata == requiredMetadata =>
230-
a
231249
case v: NamedLambdaVariable if v.name == column.name && v.metadata == requiredMetadata =>
232250
v
233251
case _ =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis.resolver
2020
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, UnresolvedAlias}
2121
import org.apache.spark.sql.catalyst.expressions.{
2222
Alias,
23+
AliasHelper,
2324
Expression,
2425
NamedExpression,
2526
OuterReference
@@ -31,7 +32,8 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
3132
*/
3233
class AliasResolver(expressionResolver: ExpressionResolver)
3334
extends TreeNodeResolver[UnresolvedAlias, Expression]
34-
with ResolvesExpressionChildren {
35+
with ResolvesExpressionChildren
36+
with AliasHelper {
3537
private val scopes = expressionResolver.getNameScopes
3638
private val expressionResolutionContextStack =
3739
expressionResolver.getExpressionResolutionContextStack
@@ -115,30 +117,19 @@ class AliasResolver(expressionResolver: ExpressionResolver)
115117
*
116118
* Project[
117119
* Alias("alias_2")(
118-
* Alias("alias_1")(id)
119-
* )
120+
* Alias("alias_1")(id1)
121+
* )(id2)
120122
* ]( ... )
121123
*
122124
* and after the `collapseAlias` call (removing the bottom one) it would be:
123125
*
124126
* Project[
125-
* Alias("alias_2")(id)
127+
* Alias("alias_2")(id2)
126128
* ]( ... )
127129
*/
128130
private def collapseAlias(alias: Alias): Alias =
129131
alias.child match {
130-
case innerAlias: Alias =>
131-
val metadata = if (alias.metadata.isEmpty) {
132-
None
133-
} else {
134-
Some(alias.metadata)
135-
}
136-
alias.copy(child = innerAlias.child)(
137-
exprId = alias.exprId,
138-
qualifier = alias.qualifier,
139-
explicitMetadata = metadata,
140-
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
141-
)
132+
case _: Alias => mergeAliases(alias)
142133
case _ => alias
143134
}
144135
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20+
import scala.annotation.tailrec
21+
2022
import org.apache.spark.sql.catalyst.analysis.MultiAlias
2123
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2224
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
@@ -68,8 +70,8 @@ trait AliasHelper {
6870
* but keep the name of the outermost attribute.
6971
*/
7072
protected def replaceAliasButKeepName(
71-
expr: NamedExpression,
72-
aliasMap: AttributeMap[Alias]): NamedExpression = {
73+
expr: NamedExpression,
74+
aliasMap: AttributeMap[Alias]): NamedExpression = {
7375
expr match {
7476
// We need to keep the `Alias` if we replace a top-level Attribute, so that it's still a
7577
// `NamedExpression`. We also need to keep the name of the original Attribute.
@@ -90,29 +92,82 @@ trait AliasHelper {
9092
case a: Alias if a.metadata != Metadata.empty => a
9193
case other => trimAliases(other)
9294
}
93-
case a @ Alias(child, _) => trimAliases(child)
95+
case Alias(child, _) => trimAliases(child)
9496
case MultiAlias(child, _) => trimAliases(child)
9597
case other => other.mapChildren(trimAliases)
9698
}
9799

98100
protected def trimNonTopLevelAliases[T <: Expression](e: T): T = {
99101
val res = CurrentOrigin.withOrigin(e.origin) {
100102
e match {
101-
case a: Alias =>
102-
// Preserve the _effective_ metadata.
103-
a.copy(child = trimAliases(a.child))(
104-
exprId = a.exprId,
105-
qualifier = a.qualifier,
106-
explicitMetadata = Some(a.metadata),
107-
nonInheritableMetadataKeys = Nil)
108-
case a: MultiAlias =>
109-
a.copy(child = trimAliases(a.child))
103+
case a: Alias => mergeAndTrimAliases(a)
104+
case a: MultiAlias => a.copy(child = trimAliases(a.child))
110105
case other => trimAliases(other)
111106
}
112107
}
113-
114108
res.copyTagsFrom(e)
115-
116109
res.asInstanceOf[T]
117110
}
111+
112+
/**
113+
* Merge any stack of aliases under the top-level alias, and then
114+
* drops any aliases deeper in the expression tree.
115+
* So Alias1(Alias2(Alias3(Foo(Alias4(x))))) becomes
116+
* Alias5(Foo(x))
117+
* where Alias5 preserves the metadata of Alias{1,2,3}
118+
* and the name and exprId of Alias1.
119+
* Alias4 is simply removed.
120+
*/
121+
@tailrec
122+
protected final def mergeAndTrimAliases(alias: Alias): Alias = {
123+
alias.child match {
124+
case _: Alias => mergeAndTrimAliases(mergeAliases(alias))
125+
case other => alias.withNewChild(trimAliases(other))
126+
}
127+
}
128+
129+
/**
130+
* Merge an Alias(Alias(x)) into Alias(x) preserving metadata.
131+
*
132+
* If the outer alias has explicit metadata,
133+
* it is preserved.
134+
* Else if the inner alias has explicit metadata,
135+
* the result has explicit outer.metadata.
136+
* Else both are deriving the metadata.
137+
* the result is deriving metadata,
138+
* with the union of noninheritable keys.
139+
*
140+
* @param alias An Alias with a child Alias, Alias(Alias(x))
141+
* @return The merged alias, Alias(x)
142+
*/
143+
protected final def mergeAliases(alias: Alias): Alias = {
144+
val child = alias.child.asInstanceOf[Alias]
145+
var explicitMetadata = alias.explicitMetadata
146+
var nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
147+
148+
if (explicitMetadata.isDefined) {
149+
// Outer alias is explicit; we can ignore inner metadata.
150+
// The outer nonInheritableMetadataKeys are irrelevant.
151+
nonInheritableMetadataKeys = Nil
152+
} else if (child.explicitMetadata.isDefined) {
153+
// Inner alias is explicit; remove any outer non-inherits.
154+
// We don't need nonInheritableMetadataKeys anymore.
155+
explicitMetadata = Some(alias.metadata)
156+
nonInheritableMetadataKeys = Nil
157+
} else {
158+
// Both are deriving. Union the nonInheritableMetadataKeys
159+
val nonInheritSet = nonInheritableMetadataKeys.toSet
160+
nonInheritableMetadataKeys = nonInheritableMetadataKeys ++
161+
child.nonInheritableMetadataKeys.filterNot(nonInheritSet)
162+
}
163+
val res = CurrentOrigin.withOrigin(alias.origin) {
164+
alias.copy(child = child.child)(
165+
exprId = alias.exprId,
166+
qualifier = alias.qualifier,
167+
explicitMetadata = explicitMetadata,
168+
nonInheritableMetadataKeys = nonInheritableMetadataKeys)
169+
}
170+
res.copyTagsFrom(alias)
171+
res
172+
}
118173
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ case class Alias(child: Expression, name: String)(
255255
s"${child.sql} AS $qualifierPrefix${quoteIfNeeded(name)}"
256256
}
257257

258+
// Copying this alias with a new child expression.
259+
def withNewChild(newChild: Expression): Alias = {
260+
withNewChildInternal(newChild)
261+
}
262+
258263
override protected def withNewChildInternal(newChild: Expression): Alias =
259264
copy(child = newChild)(exprId, qualifier, explicitMetadata, nonInheritableMetadataKeys)
260265
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
3636
import org.apache.spark.sql.catalyst.trees.TreePattern._
3737
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3838
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
39+
import org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS
3940
import org.apache.spark.sql.connector.catalog.CatalogManager
4041
import org.apache.spark.sql.errors.QueryCompilationErrors
4142
import org.apache.spark.sql.internal.SQLConf
@@ -619,11 +620,28 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
619620
// If the alias name is different from attribute name, we can't strip it either, or we
620621
// may accidentally change the output schema name of the root plan.
621622
case a @ Alias(attr: Attribute, name)
622-
if (a.metadata == attr.metadata) &&
623-
name == attr.name &&
624-
!excludeList.contains(attr) &&
625-
!excludeList.contains(a) =>
626-
attr
623+
if !excludeList.contains(attr) &&
624+
!excludeList.contains(a) &&
625+
name == attr.name =>
626+
627+
val metadata = a.metadata
628+
var attrMetadata = attr.metadata
629+
if (metadata == attrMetadata) {
630+
// The alias is truly redundant, remove it.
631+
attr
632+
} else if (attr.metadata.contains(AUTO_GENERATED_ALIAS)) {
633+
attrMetadata = attr.metadata.withKeyRemoved(AUTO_GENERATED_ALIAS)
634+
if (metadata == attrMetadata) {
635+
// The AUTO_GENERATED_ALIAS is not propagating to a view, so it is ok to remove it.
636+
// With that key removed, the alias is now redundant, remove it.
637+
attr.withMetadata(metadata)
638+
} else {
639+
a
640+
}
641+
} else {
642+
a
643+
}
644+
627645
case a => a
628646
}
629647

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
332332
ArrayType(new StructType().add("y", "int").add("x", "byte")),
333333
hasTransform = true)
334334

335-
withSQLConf("spark.sql.preserveCharVarcharTypeInfo" -> "true") {
335+
withSQLConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO.key -> "true") {
336336
// exact match on VARCHAR does not need transform
337337
assertArrayField(ArrayType(VarcharType(7)), ArrayType(VarcharType(7)), hasTransform = false)
338338
// VARCHAR length increase could avoid transform
@@ -512,7 +512,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
512512
val y = query.output.last
513513

514514
val parsedPlan = byName(table, query)
515-
val expectedPlan = byName(table, Project(Seq(Alias(X, "x")(), y), query))
515+
val expectedPlan = byName(table,
516+
Project(Seq(Alias(X, "x")(), Alias(y, y.name)()), query))
516517

517518
assertNotResolved(parsedPlan)
518519
checkAnalysis(parsedPlan, expectedPlan, caseSensitive = false)
@@ -529,7 +530,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
529530
val x = query.output.last
530531

531532
val parsedPlan = byName(table, query)
532-
val expectedPlan = byName(table, Project(Seq(x, y), query))
533+
val expectedPlan = byName(table,
534+
Project(Seq(Alias(x, x.name)(), Alias(y, y.name)()), query))
533535

534536
assertNotResolved(parsedPlan)
535537
checkAnalysis(parsedPlan, expectedPlan)

sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ WITH s AS (SELECT 43 AS col)
5757
INSERT INTO cte_tbl SELECT * FROM S
5858
-- !query analysis
5959
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
60-
+- WithCTE
61-
:- CTERelationDef xxxx, false
62-
: +- SubqueryAlias s
63-
: +- Project [43 AS col#x]
64-
: +- OneRowRelation
65-
+- Project [col#x]
66-
+- SubqueryAlias S
67-
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
60+
+- Project [col#x AS col#x]
61+
+- WithCTE
62+
:- CTERelationDef xxxx, false
63+
: +- SubqueryAlias s
64+
: +- Project [43 AS col#x]
65+
: +- OneRowRelation
66+
+- Project [col#x]
67+
+- SubqueryAlias S
68+
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
6869

6970

7071
-- !query
@@ -79,14 +80,15 @@ Project [col#x]
7980
INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
8081
-- !query analysis
8182
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
82-
+- WithCTE
83-
:- CTERelationDef xxxx, false
84-
: +- SubqueryAlias s
85-
: +- Project [44 AS col#x]
86-
: +- OneRowRelation
87-
+- Project [col#x]
88-
+- SubqueryAlias s
89-
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
83+
+- Project [col#x AS col#x]
84+
+- WithCTE
85+
:- CTERelationDef xxxx, false
86+
: +- SubqueryAlias s
87+
: +- Project [44 AS col#x]
88+
: +- OneRowRelation
89+
+- Project [col#x]
90+
+- SubqueryAlias s
91+
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
9092

9193

9294
-- !query
@@ -111,15 +113,17 @@ INSERT INTO cte_tbl2 SELECT col
111113
-- !query analysis
112114
Union false, false
113115
:- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
114-
: +- Project [col#x]
115-
: +- SubqueryAlias s
116-
: +- Project [45 AS col#x]
117-
: +- OneRowRelation
116+
: +- Project [col#x AS col#x]
117+
: +- Project [col#x]
118+
: +- SubqueryAlias s
119+
: +- Project [45 AS col#x]
120+
: +- OneRowRelation
118121
+- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl2], Append, `spark_catalog`.`default`.`cte_tbl2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl2), [col]
119-
+- Project [col#x]
120-
+- SubqueryAlias s
121-
+- Project [45 AS col#x]
122-
+- OneRowRelation
122+
+- Project [col#x AS col#x]
123+
+- Project [col#x]
124+
+- SubqueryAlias s
125+
+- Project [45 AS col#x]
126+
+- OneRowRelation
123127

124128

125129
-- !query

0 commit comments

Comments
 (0)