Skip to content

Commit 3c94b46

Browse files
committed
[SPARK-53707] Improve attribute metadata handling.
thetasketch.sql.out is outdated expected from recursive sql changes; fixing it too
1 parent cb28340 commit 3c94b46

File tree

18 files changed

+246
-121
lines changed

18 files changed

+246
-121
lines changed

sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,24 @@ sealed class Metadata private[types] (private[types] val map: Map[String, Any])
9090
/** Gets a Metadata array. */
9191
def getMetadataArray(key: String): Array[Metadata] = get(key)
9292

93+
/** Return a copy with the keys removed */
94+
def withKeysRemoved(keysToRemove: Seq[String]): Metadata = {
95+
if (keysToRemove.isEmpty) {
96+
this
97+
} else {
98+
new Metadata(this.map -- keysToRemove)
99+
}
100+
}
101+
102+
/** Return a copy with a key removed */
103+
def withKeyRemoved(keyToRemove: String): Metadata = {
104+
if (map.contains(keyToRemove)) {
105+
new Metadata(map - keyToRemove)
106+
} else {
107+
this
108+
}
109+
}
110+
93111
/** Converts to its JSON representation. */
94112
def json: String = compact(render(jsonValue))
95113

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ object TableOutputResolver extends SQLConfHelper with Logging {
133133
val canWriteExpr = canWrite(
134134
tableName, valueType, colType, byName = true, conf, addError, colPath)
135135
if (canWriteExpr) {
136-
applyColumnMetadata(checkNullability(value, col, conf, colPath), col)
136+
val nullsHandled = checkNullability(value, col, conf, colPath)
137+
applyColumnMetadata(nullsHandled, col)
137138
} else {
138139
value
139140
}
@@ -222,12 +223,29 @@ object TableOutputResolver extends SQLConfHelper with Logging {
222223
val requiredMetadata = CharVarcharUtils.cleanMetadata(column.metadata)
223224

224225
// Make sure that the result has the requiredMetadata and only that.
225-
// If the expr is an Attribute or NamedLambdaVariable with the proper name and metadata,
226-
// it should remain stable, but we do not trust that other NamedAttributes will
227-
// remain stable (namely Alias).
226+
//
227+
// If the expr is a NamedLambdaVariable, it must be from our handling of structured
228+
// array or map fields; the Alias will be added on the outer structured value.
229+
//
230+
// Even an Attribute with the proper name and metadata is not enough to prevent
231+
// source query metadata leaking to the Write after rewrites, ie:
232+
// case a: Attribute if a.name == column.name && a.metadata == requiredMetadata => a
233+
//
234+
// The problem is that an Attribute can be replaced by what it refers to, for example:
235+
// Project AttrRef(metadata={}, exprId=2)
236+
// Project Alias(
237+
// cast(AttrRef(metadata={source_field_default_value}, exprId=1) as same_type),
238+
// exprId=2,
239+
// explicitMetadata=None) -- metadata.isEmpty
240+
// gets rewritten to:
241+
// Project Alias(
242+
// AttrRef(metadata={source_field_default_value}, exprId=1),
243+
// exprId=2,
244+
// explicitMetadata=None) -- metadata.nonEmpty !!
245+
//
246+
// So we always add an Alias(expr, name, explicitMetadata = Some(requiredMetadata))
247+
// to prevent expr from leaking the source query metadata into the Write.
228248
expr match {
229-
case a: Attribute if a.name == column.name && a.metadata == requiredMetadata =>
230-
a
231249
case v: NamedLambdaVariable if v.name == column.name && v.metadata == requiredMetadata =>
232250
v
233251
case _ =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis.resolver
2020
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, UnresolvedAlias}
2121
import org.apache.spark.sql.catalyst.expressions.{
2222
Alias,
23+
AliasHelper,
2324
Expression,
2425
NamedExpression,
2526
OuterReference
@@ -31,7 +32,8 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
3132
*/
3233
class AliasResolver(expressionResolver: ExpressionResolver)
3334
extends TreeNodeResolver[UnresolvedAlias, Expression]
34-
with ResolvesExpressionChildren {
35+
with ResolvesExpressionChildren
36+
with AliasHelper {
3537
private val scopes = expressionResolver.getNameScopes
3638
private val expressionResolutionContextStack =
3739
expressionResolver.getExpressionResolutionContextStack
@@ -115,30 +117,19 @@ class AliasResolver(expressionResolver: ExpressionResolver)
115117
*
116118
* Project[
117119
* Alias("alias_2")(
118-
* Alias("alias_1")(id)
119-
* )
120+
* Alias("alias_1")(id1)
121+
* )(id2)
120122
* ]( ... )
121123
*
122124
* and after the `collapseAlias` call (removing the bottom one) it would be:
123125
*
124126
* Project[
125-
* Alias("alias_2")(id)
127+
* Alias("alias_2")(id2)
126128
* ]( ... )
127129
*/
128130
private def collapseAlias(alias: Alias): Alias =
129131
alias.child match {
130-
case innerAlias: Alias =>
131-
val metadata = if (alias.metadata.isEmpty) {
132-
None
133-
} else {
134-
Some(alias.metadata)
135-
}
136-
alias.copy(child = innerAlias.child)(
137-
exprId = alias.exprId,
138-
qualifier = alias.qualifier,
139-
explicitMetadata = metadata,
140-
nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
141-
)
132+
case _: Alias => mergeAliases(alias)
142133
case _ => alias
143134
}
144135
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20+
import scala.annotation.tailrec
21+
2022
import org.apache.spark.sql.catalyst.analysis.MultiAlias
2123
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2224
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
@@ -68,8 +70,8 @@ trait AliasHelper {
6870
* but keep the name of the outermost attribute.
6971
*/
7072
protected def replaceAliasButKeepName(
71-
expr: NamedExpression,
72-
aliasMap: AttributeMap[Alias]): NamedExpression = {
73+
expr: NamedExpression,
74+
aliasMap: AttributeMap[Alias]): NamedExpression = {
7375
expr match {
7476
// We need to keep the `Alias` if we replace a top-level Attribute, so that it's still a
7577
// `NamedExpression`. We also need to keep the name of the original Attribute.
@@ -90,29 +92,82 @@ trait AliasHelper {
9092
case a: Alias if a.metadata != Metadata.empty => a
9193
case other => trimAliases(other)
9294
}
93-
case a @ Alias(child, _) => trimAliases(child)
95+
case Alias(child, _) => trimAliases(child)
9496
case MultiAlias(child, _) => trimAliases(child)
9597
case other => other.mapChildren(trimAliases)
9698
}
9799

98100
protected def trimNonTopLevelAliases[T <: Expression](e: T): T = {
99101
val res = CurrentOrigin.withOrigin(e.origin) {
100102
e match {
101-
case a: Alias =>
102-
// Preserve the _effective_ metadata.
103-
a.copy(child = trimAliases(a.child))(
104-
exprId = a.exprId,
105-
qualifier = a.qualifier,
106-
explicitMetadata = Some(a.metadata),
107-
nonInheritableMetadataKeys = Nil)
108-
case a: MultiAlias =>
109-
a.copy(child = trimAliases(a.child))
103+
case a: Alias => mergeAndTrimAliases(a)
104+
case a: MultiAlias => a.copy(child = trimAliases(a.child))
110105
case other => trimAliases(other)
111106
}
112107
}
113-
114108
res.copyTagsFrom(e)
115-
116109
res.asInstanceOf[T]
117110
}
111+
112+
/**
113+
* Merge any stack of aliases under the top-level alias, and then
114+
* drops any aliases deeper in the expression tree.
115+
* So Alias1(Alias2(Alias3(Foo(Alias4(x))))) becomes
116+
* Alias5(Foo(x))
117+
* where Alias5 preserves the metadata of Alias{1,2,3}
118+
* and the name and exprId of Alias1.
119+
* Alias4 is simply removed.
120+
*/
121+
@tailrec
122+
protected final def mergeAndTrimAliases(alias: Alias): Alias = {
123+
alias.child match {
124+
case _: Alias => mergeAndTrimAliases(mergeAliases(alias))
125+
case other => alias.withNewChild(trimAliases(other))
126+
}
127+
}
128+
129+
/**
130+
* Merge an Alias(Alias(x)) into Alias(x) preserving metadata.
131+
*
132+
* If the outer alias has explicit metadata,
133+
* it is preserved.
134+
* Else if the inner alias has explicit metadata,
135+
* the result has explicit outer.metadata.
136+
* Else both are deriving the metadata.
137+
* the result is deriving metadata,
138+
* with the union of noninheritable keys.
139+
*
140+
* @param alias An Alias with a child Alias, Alias(Alias(x))
141+
* @return The merged alias, Alias(x)
142+
*/
143+
protected final def mergeAliases(alias: Alias): Alias = {
144+
val child = alias.child.asInstanceOf[Alias]
145+
var explicitMetadata = alias.explicitMetadata
146+
var nonInheritableMetadataKeys = alias.nonInheritableMetadataKeys
147+
148+
if (explicitMetadata.isDefined) {
149+
// Outer alias is explicit; we can ignore inner metadata.
150+
// The outer nonInheritableMetadataKeys are irrelevant.
151+
nonInheritableMetadataKeys = Nil
152+
} else if (child.explicitMetadata.isDefined) {
153+
// Inner alias is explicit; remove any outer non-inherits.
154+
// We don't need nonInheritableMetadataKeys anymore.
155+
explicitMetadata = Some(alias.metadata)
156+
nonInheritableMetadataKeys = Nil
157+
} else {
158+
// Both are deriving. Union the nonInheritableMetadataKeys
159+
val nonInheritSet = nonInheritableMetadataKeys.toSet
160+
nonInheritableMetadataKeys = nonInheritableMetadataKeys ++
161+
child.nonInheritableMetadataKeys.filterNot(nonInheritSet)
162+
}
163+
val res = CurrentOrigin.withOrigin(alias.origin) {
164+
alias.copy(child = child.child)(
165+
exprId = alias.exprId,
166+
qualifier = alias.qualifier,
167+
explicitMetadata = explicitMetadata,
168+
nonInheritableMetadataKeys = nonInheritableMetadataKeys)
169+
}
170+
res.copyTagsFrom(alias)
171+
res
172+
}
118173
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ case class Alias(child: Expression, name: String)(
255255
s"${child.sql} AS $qualifierPrefix${quoteIfNeeded(name)}"
256256
}
257257

258+
// Copying this alias with a new child expression.
259+
def withNewChild(newChild: Expression): Alias = {
260+
withNewChildInternal(newChild)
261+
}
262+
258263
override protected def withNewChildInternal(newChild: Expression): Alias =
259264
copy(child = newChild)(exprId, qualifier, explicitMetadata, nonInheritableMetadataKeys)
260265
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
3636
import org.apache.spark.sql.catalyst.trees.TreePattern._
3737
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3838
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
39+
import org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS
3940
import org.apache.spark.sql.connector.catalog.CatalogManager
4041
import org.apache.spark.sql.errors.QueryCompilationErrors
4142
import org.apache.spark.sql.internal.SQLConf
@@ -619,11 +620,28 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
619620
// If the alias name is different from attribute name, we can't strip it either, or we
620621
// may accidentally change the output schema name of the root plan.
621622
case a @ Alias(attr: Attribute, name)
622-
if (a.metadata == attr.metadata) &&
623-
name == attr.name &&
624-
!excludeList.contains(attr) &&
625-
!excludeList.contains(a) =>
626-
attr
623+
if !excludeList.contains(attr) &&
624+
!excludeList.contains(a) &&
625+
name == attr.name =>
626+
627+
val metadata = a.metadata
628+
var attrMetadata = attr.metadata
629+
if (metadata == attrMetadata) {
630+
// The alias is truly redundant, remove it.
631+
attr
632+
} else if (attr.metadata.contains(AUTO_GENERATED_ALIAS)) {
633+
attrMetadata = attr.metadata.withKeyRemoved(AUTO_GENERATED_ALIAS)
634+
if (metadata == attrMetadata) {
635+
// The AUTO_GENERATED_ALIAS is not propagating to a view, so it is ok to remove it.
636+
// With that key removed, the alias is now redundant, remove it.
637+
attr.withMetadata(metadata)
638+
} else {
639+
a
640+
}
641+
} else {
642+
a
643+
}
644+
627645
case a => a
628646
}
629647

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
332332
ArrayType(new StructType().add("y", "int").add("x", "byte")),
333333
hasTransform = true)
334334

335-
withSQLConf("spark.sql.preserveCharVarcharTypeInfo" -> "true") {
335+
withSQLConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO.key -> "true") {
336336
// exact match on VARCHAR does not need transform
337337
assertArrayField(ArrayType(VarcharType(7)), ArrayType(VarcharType(7)), hasTransform = false)
338338
// VARCHAR length increase could avoid transform
@@ -512,7 +512,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
512512
val y = query.output.last
513513

514514
val parsedPlan = byName(table, query)
515-
val expectedPlan = byName(table, Project(Seq(Alias(X, "x")(), y), query))
515+
val expectedPlan = byName(table,
516+
Project(Seq(Alias(X, "x")(), Alias(y, y.name)()), query))
516517

517518
assertNotResolved(parsedPlan)
518519
checkAnalysis(parsedPlan, expectedPlan, caseSensitive = false)
@@ -529,7 +530,8 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
529530
val x = query.output.last
530531

531532
val parsedPlan = byName(table, query)
532-
val expectedPlan = byName(table, Project(Seq(x, y), query))
533+
val expectedPlan = byName(table,
534+
Project(Seq(Alias(x, x.name)(), Alias(y, y.name)()), query))
533535

534536
assertNotResolved(parsedPlan)
535537
checkAnalysis(parsedPlan, expectedPlan)

sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ WITH s AS (SELECT 43 AS col)
5757
INSERT INTO cte_tbl SELECT * FROM S
5858
-- !query analysis
5959
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
60-
+- WithCTE
61-
:- CTERelationDef xxxx, false
62-
: +- SubqueryAlias s
63-
: +- Project [43 AS col#x]
64-
: +- OneRowRelation
65-
+- Project [col#x]
66-
+- SubqueryAlias S
67-
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
60+
+- Project [col#x AS col#x]
61+
+- WithCTE
62+
:- CTERelationDef xxxx, false
63+
: +- SubqueryAlias s
64+
: +- Project [43 AS col#x]
65+
: +- OneRowRelation
66+
+- Project [col#x]
67+
+- SubqueryAlias S
68+
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
6869

6970

7071
-- !query
@@ -79,14 +80,15 @@ Project [col#x]
7980
INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
8081
-- !query analysis
8182
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
82-
+- WithCTE
83-
:- CTERelationDef xxxx, false
84-
: +- SubqueryAlias s
85-
: +- Project [44 AS col#x]
86-
: +- OneRowRelation
87-
+- Project [col#x]
88-
+- SubqueryAlias s
89-
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
83+
+- Project [col#x AS col#x]
84+
+- WithCTE
85+
:- CTERelationDef xxxx, false
86+
: +- SubqueryAlias s
87+
: +- Project [44 AS col#x]
88+
: +- OneRowRelation
89+
+- Project [col#x]
90+
+- SubqueryAlias s
91+
+- CTERelationRef xxxx, true, [col#x], false, false, 1, false
9092

9193

9294
-- !query
@@ -111,15 +113,17 @@ INSERT INTO cte_tbl2 SELECT col
111113
-- !query analysis
112114
Union false, false
113115
:- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
114-
: +- Project [col#x]
115-
: +- SubqueryAlias s
116-
: +- Project [45 AS col#x]
117-
: +- OneRowRelation
116+
: +- Project [col#x AS col#x]
117+
: +- Project [col#x]
118+
: +- SubqueryAlias s
119+
: +- Project [45 AS col#x]
120+
: +- OneRowRelation
118121
+- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl2], Append, `spark_catalog`.`default`.`cte_tbl2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl2), [col]
119-
+- Project [col#x]
120-
+- SubqueryAlias s
121-
+- Project [45 AS col#x]
122-
+- OneRowRelation
122+
+- Project [col#x AS col#x]
123+
+- Project [col#x]
124+
+- SubqueryAlias s
125+
+- Project [45 AS col#x]
126+
+- OneRowRelation
123127

124128

125129
-- !query

0 commit comments

Comments
 (0)