Skip to content

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Sep 27, 2025

What changes were proposed in this pull request?

This PR fixes a bug in plannedWrite, where the query has a literal output of the partition column.

CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k);

INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i;

The evaluation of FileFormatWriter.orderingMatched fails because SortOrder(Literal) is eliminated by EliminateSorts.

The idea is to expose and keep "constant order" expressions from child.outputOrdering

Why are the changes needed?

V1Writes will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect.

SPARK-53707 fixes the issue accidentally(and partially) by adding a Project of query in V1Writes.

Before SPARK-53707

Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false
+- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282]
   +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet

After SPARK-53707

Project [i#284, j#285, 0 AS k#290]
+- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false
   +- Project [i#284, j#285]
      +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet

This PR fixes the issue thoroughly, with a new UT added.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT is added.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Sep 27, 2025
val literalColumns = queryOutput.flatMap { ne => isLiteral(ne, ne.name) }

// We should first sort by dynamic partition columns, then bucket id, and finally sorting
// columns, then drop literal columns
Copy link
Member Author

@pan3793 pan3793 Sep 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, SortOrder(Literal) has been eliminated by EliminateSorts

Copy link
Contributor

@peter-toth peter-toth Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you basically change requiredOrdering and drop those columns that are defined as literals in outputExpressions of the top OrderPreservingUnaryExecNode node. But what if a literal definition is not in the top node?

Shouldn't we do the other way around and fix actualOrdering? I mean if we have a query:

+- Project [i, j, 0 AS k]
   + ...
      +- Sort [i]
         +- Relation

Then shouldn't actualOrdering (outputOrdering of the Project) be Seq(SortOrder(k), SortOrder(i)) as we know that k is a constant? I.e. Project could prepend its contants to the alias transformed child.outputOrdering.

And similarly, when we have:

+- Sort [i]
   + ...
      +- Project [i, j, 0 AS k]
         +- Relation

Then shoudn't outputOrdering of the Project node be Seq(SortOrder(k)), and outputOrdering of the Sort be Seq(SortOrder(k), SortOrder(i))? I.e. Project could somehow mark that SortOrder(k) as "constant order", and Sort should just extend "constant order" expressions from child.outputOrdering with the new order expressions (i).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth thanks for your tips, that sounds reasonable, and I have updated the code in this approach, it's effective both w/ and w/o SPARK-53707

@pan3793
Copy link
Member Author

pan3793 commented Sep 27, 2025

cc @ulysses-you @peter-toth

@pan3793 pan3793 marked this pull request as ready for review September 27, 2025 04:47
@github-actions github-actions bot added the BUILD label Sep 27, 2025

val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val conf = qe.sparkSession.sessionState.conf
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bugfix, the listener runs in another thread, without this change, conf.getConf actually gets conf from the thread local, thus may cause issues on concurrency running tests

@pan3793
Copy link
Member Author

pan3793 commented Sep 28, 2025

cc @cloud-fan, could you please take a look?

@peter-toth
Copy link
Contributor

peter-toth commented Sep 28, 2025

@pan3793 , I can take a deeper look at this PR tomorrow, but the OrderPreservingUnaryNode related changes look a bit unexpected to me at first sight. Can you please elaborate on why are those needed?

@pan3793 pan3793 marked this pull request as draft September 28, 2025 16:32
@pan3793
Copy link
Member Author

pan3793 commented Sep 29, 2025

@peter-toth Let me take this example to explain what I'm trying to do,

CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k);

INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i;

In V1Writes.prepareQuery, the query looks like

Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false
+- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282]
   +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet

and query.outputOrdering is [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], while requiredOrdering is [k#282 ASC NULLS FIRST], thus orderingMatched will be false, then Sort(requiredOrdering, global = false, empty2NullPlan) will be added on top.

the idea is to leverage the alias information in Sort to make outputOrdering knows 0 is alias of k, thus outputOrdering can satisfy requiredOrdering.


BUT, when I debugged it last night, I found the issue had gone magically, and in V1Writes.prepareQuery, the query looked like:

Project [i#284, j#285, 0 AS k#290]
+- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false
   +- Project [i#284, j#285]
      +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet

After some investigation, I found this was accidentally fixed by SPARK-53707 (#52449), which got merged just a few days ago (I happened to start constructing the UT before it got in ...), it fixes the issue by adding a Project on the Sort, in the PreprocessTableInsertion rule.

Note: the physics plan change in this PR is still required to satisfy the UT.

Now, I'm not sure if this is still an issue ...

@pan3793 pan3793 marked this pull request as ready for review September 29, 2025 10:11
@peter-toth
Copy link
Contributor

peter-toth commented Sep 29, 2025

I see, thanks for the details @pan3793.

I feel that a more comprehensive fix would be to not change requiredOrdering but deal with constant expressions in outputOrdering of both Project and Sort: #52474 (comment)

But this is a complex topic, so @ulysses-you or @cloud-fan or others might have better ideas.

newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.filter {
case Alias(child, _) => child.foldable
case expr => expr.foldable
}.map(SortOrder(_, Ascending).copy(isConstant = true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, do we need to add the whole Alias to SortOrder expression or could adding only the generated attribute work?
Also, I wonder if it would be a breaking change to add Constant as a new SortDirection instead of using a boolean flag?

Copy link
Member Author

@pan3793 pan3793 Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth the Constant SortDirection sounds like a good idea, I have updated the code to use it.

while I have tried to add only the generated attribute (I left the code as comments)

    newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.flatMap {
      case alias @ Alias(child, _) if child.foldable =>
        Some(SortOrder(alias.toAttribute, Constant))
      case expr if expr.foldable =>
        Some(SortOrder(expr, Constant))
      case _ => None
    }

there are two tests fail (haven't figured out the root cause)

[info] CachedTableSuite:
...
[info] - SPARK-36120: Support cache/uncache table with TimestampNTZ type *** FAILED *** (43 milliseconds)
[info]   AttributeSet(TIMESTAMP_NTZ '2021-01-01 00:00:00'#17739) was not empty The optimized logical plan has missing inputs:
[info]   InMemoryRelation [TIMESTAMP_NTZ '2021-01-01 00:00:00'#17776], StorageLevel(disk, memory, deserialized, 1 replicas)
[info]      +- *(1) Project [2021-01-01 00:00:00 AS TIMESTAMP_NTZ '2021-01-01 00:00:00'#17739]
[info]         +- *(1) Scan OneRowRelation[] (QueryTest.scala:241)
...
[info] - SPARK-52692: Support cache/uncache table with Time type *** FAILED *** (58 milliseconds)
[info]   AttributeSet(TIME '22:00:00'#18852) was not empty The optimized logical plan has missing inputs:
[info]   InMemoryRelation [TIME '22:00:00'#18889], StorageLevel(disk, memory, deserialized, 1 replicas)
[info]      +- *(1) Project [22:00:00 AS TIME '22:00:00'#18852]
[info]         +- *(1) Scan OneRowRelation[] (QueryTest.scala:241)
...

Copy link
Contributor

@peter-toth peter-toth Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem seems to be that InMemoryRelation.withOutput() doesn't remap outputOrdering. And because outputOrdering is present in InMemoryRelation as case class argument the unmapped ordering attributes are considered missing inputs.

This seems to be another hidden issue with InMemoryRelation.outputOrdering and got exposed with this change.

Copy link
Contributor

@peter-toth peter-toth Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a small PR into this PR: pan3793#2, hopefully it helps fixing the above tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth Many thanks for your professionalism and patience! I tested locally, and it did fix the issue. Have educated a lot from your review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It a pleasure working with you @pan3793!

@peter-toth
Copy link
Contributor

peter-toth commented Sep 30, 2025

Thank you @pan3793. I like the new approach, just have a minor suggestions.
Can you please leave some comments in SortOrder to describe the new flag/ordering + update the PR description how constant order and its propagation solves the issue?


override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
copied.statsOfPlanToCache = this.statsOfPlanToCache
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is a hidden bug just exposed by this change.

@peter-toth
Copy link
Contributor

We should adjust SortOrder.orderingSatisfies() too.

InMemoryRelation.withOutput fix
@pan3793
Copy link
Member Author

pan3793 commented Oct 1, 2025

Suppose all code issues are fixed, let's wait for another round CI, I will update the comment soon.

@peter-toth
Copy link
Contributor

cc @cloud-fan , @ulysses-you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants