-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53738][SQL] PlannedWrite should preserve custom sort order when query output contains literal #52474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
val literalColumns = queryOutput.flatMap { ne => isLiteral(ne, ne.name) } | ||
|
||
// We should first sort by dynamic partition columns, then bucket id, and finally sorting | ||
// columns, then drop literal columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, SortOrder(Literal)
has been eliminated by EliminateSorts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you basically change requiredOrdering
and drop those columns that are defined as literals in outputExpressions
of the top OrderPreservingUnaryExecNode
node. But what if a literal definition is not in the top node?
Shouldn't we do the other way around and fix actualOrdering
? I mean if we have a query:
+- Project [i, j, 0 AS k]
+ ...
+- Sort [i]
+- Relation
Then shouldn't actualOrdering
(outputOrdering
of the Project
) be Seq(SortOrder(k), SortOrder(i))
as we know that k
is a constant? I.e. Project
could prepend its contants to the alias transformed child.outputOrdering
.
And similarly, when we have:
+- Sort [i]
+ ...
+- Project [i, j, 0 AS k]
+- Relation
Then shoudn't outputOrdering
of the Project
node be Seq(SortOrder(k))
, and outputOrdering
of the Sort
be Seq(SortOrder(k), SortOrder(i))
? I.e. Project
could somehow mark that SortOrder(k)
as "constant order", and Sort
should just extend "constant order" expressions from child.outputOrdering
with the new order expressions (i
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth thanks for your tips, that sounds reasonable, and I have updated the code in this approach, it's effective both w/ and w/o SPARK-53707
…n query output contains literal
9b549cd
to
a0aa9f4
Compare
|
||
val listener = new QueryExecutionListener { | ||
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { | ||
val conf = qe.sparkSession.sessionState.conf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bugfix, the listener runs in another thread, without this change, conf.getConf
actually gets conf
from the thread local, thus may cause issues on concurrency running tests
cc @cloud-fan, could you please take a look? |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
@pan3793 , I can take a deeper look at this PR tomorrow, but the |
@peter-toth Let me take this example to explain what I'm trying to do,
In
and the idea is to leverage the alias information in BUT, when I debugged it last night, I found the issue had gone magically, and in
After some investigation, I found this was accidentally fixed by SPARK-53707 (#52449), which got merged just a few days ago (I happened to start constructing the UT before it got in ...), it fixes the issue by adding a Note: the physics plan change in this PR is still required to satisfy the UT. Now, I'm not sure if this is still an issue ... |
I see, thanks for the details @pan3793. I feel that a more comprehensive fix would be to not change But this is a complex topic, so @ulysses-you or @cloud-fan or others might have better ideas. |
newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.filter { | ||
case Alias(child, _) => child.foldable | ||
case expr => expr.foldable | ||
}.map(SortOrder(_, Ascending).copy(isConstant = true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, do we need to add the whole Alias
to SortOrder
expression or could adding only the generated attribute work?
Also, I wonder if it would be a breaking change to add Constant
as a new SortDirection
instead of using a boolean flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth the Constant SortDirection sounds like a good idea, I have updated the code to use it.
while I have tried to add only the generated attribute (I left the code as comments)
newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.flatMap {
case alias @ Alias(child, _) if child.foldable =>
Some(SortOrder(alias.toAttribute, Constant))
case expr if expr.foldable =>
Some(SortOrder(expr, Constant))
case _ => None
}
there are two tests fail (haven't figured out the root cause)
[info] CachedTableSuite:
...
[info] - SPARK-36120: Support cache/uncache table with TimestampNTZ type *** FAILED *** (43 milliseconds)
[info] AttributeSet(TIMESTAMP_NTZ '2021-01-01 00:00:00'#17739) was not empty The optimized logical plan has missing inputs:
[info] InMemoryRelation [TIMESTAMP_NTZ '2021-01-01 00:00:00'#17776], StorageLevel(disk, memory, deserialized, 1 replicas)
[info] +- *(1) Project [2021-01-01 00:00:00 AS TIMESTAMP_NTZ '2021-01-01 00:00:00'#17739]
[info] +- *(1) Scan OneRowRelation[] (QueryTest.scala:241)
...
[info] - SPARK-52692: Support cache/uncache table with Time type *** FAILED *** (58 milliseconds)
[info] AttributeSet(TIME '22:00:00'#18852) was not empty The optimized logical plan has missing inputs:
[info] InMemoryRelation [TIME '22:00:00'#18889], StorageLevel(disk, memory, deserialized, 1 replicas)
[info] +- *(1) Project [22:00:00 AS TIME '22:00:00'#18852]
[info] +- *(1) Scan OneRowRelation[] (QueryTest.scala:241)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem seems to be that InMemoryRelation.withOutput() doesn't remap outputOrdering
. And because outputOrdering
is present in InMemoryRelation
as case class argument the unmapped ordering attributes are considered missing inputs.
This seems to be another hidden issue with InMemoryRelation.outputOrdering
and got exposed with this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened a small PR into this PR: pan3793#2, hopefully it helps fixing the above tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth Many thanks for your professionalism and patience! I tested locally, and it did fix the issue. Have educated a lot from your review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It a pleasure working with you @pan3793!
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
Outdated
Show resolved
Hide resolved
Thank you @pan3793. I like the new approach, just have a minor suggestions. |
|
||
override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = { | ||
val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation] | ||
copied.statsOfPlanToCache = this.statsOfPlanToCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this is a hidden bug just exposed by this change.
We should adjust |
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
InMemoryRelation.withOutput fix
Suppose all code issues are fixed, let's wait for another round CI, I will update the comment soon. |
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
Outdated
Show resolved
Hide resolved
cc @cloud-fan , @ulysses-you |
What changes were proposed in this pull request?
This PR fixes a bug in
plannedWrite
, where thequery
has a literal output of the partition column.The evaluation of
FileFormatWriter.orderingMatched
fails becauseSortOrder(Literal)
is eliminated byEliminateSorts
.The idea is to expose and keep "constant order" expressions from
child.outputOrdering
Why are the changes needed?
V1Writes
will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect.SPARK-53707 fixes the issue accidentally(and partially) by adding a
Project
of query inV1Writes
.Before SPARK-53707
After SPARK-53707
This PR fixes the issue thoroughly, with a new UT added.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT is added.
Was this patch authored or co-authored using generative AI tooling?
No.