-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38224][table-planner] Do not convert to delta join when the sink is a retract sink #26907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…nk is a retract sink
+ ")"); | ||
|
||
String sql = "insert into pk_snk select a,b,c from retract_src"; | ||
String sql = "insert into pk_upsert_snk select a,b,c from retract_src"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some places are a,b,c, while others are a, b, c. The code style for spaces can be unified.
@@ -545,6 +571,39 @@ class DeltaJoinTest extends TableTestBase { | |||
util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1") | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another curious thing is why TABLE_EXEC_SINK_UPSERT_MATERIALIZE should be set to NONE. I have observed that other tests also have the same upsertMaterialize=[true]
in DeltaJoinTest. xml
"insert into retract_snk select * from src1 join src2 " + | ||
"on src1.a1 = src2.b1 " + | ||
"and src1.a2 = src2.b2") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY to FORCE and provide some different exception messages to indicate the reason for not supporting it
verifyRelPlanInsert(sql); | ||
} | ||
|
||
@TestTemplate | ||
void testChangelogNormalize() { | ||
assumeTrue(testSinkWithPk); | ||
|
||
String sql = "insert into pk_snk select a,b,c from upsert_src"; | ||
String sql = "insert into pk_retract_snk select a,b,c from upsert_src"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upsert_src is only used here, can we move the ddl here
void testRetractSink() { | ||
assumeTrue(testSinkWithPk); | ||
|
||
String sql = "insert into pk_retract_snk select a,b,c from retract_src"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there is essentially no difference between testRetractSink and testChangelogNormalize, both of which are caused by pk_detract_stnk resulting in duplicateChanges being set to disallow. Therefore, they can be merged into one test and the ddl of pk_detract_stnk can be moved here
Thanks for improving the duplicate semantics, let me provide some comments. |
What is the purpose of the change
Delta joins might generate redundant data, while retract sinks require data to appear in pairs as both '+' and '-' forms. So we should not convert join to delta join when the sink is a retract sink.
Brief change log
Verifying this change
Tests are added to verify this pr.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation