Skip to content

Commit f092968

Browse files
authored
feat: support push down filter to materialized cte (#18493)
* support push down filter to materialized cte * fix missing header * fix * fix * fix * fix * add test * fix test * fix test * fix * fast return * fix test * add log
1 parent b2fd93a commit f092968

File tree

23 files changed

+984
-73
lines changed

23 files changed

+984
-73
lines changed

src/query/sql/src/executor/physical_plan_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl PhysicalPlanBuilder {
133133
self.build_materialized_cte(s_expr, materialized_cte, stat_info)
134134
.await
135135
}
136-
RelOperator::MaterializeCTERef(cte_consumer) => {
136+
RelOperator::MaterializedCTERef(cte_consumer) => {
137137
self.build_cte_consumer(cte_consumer, stat_info).await
138138
}
139139
RelOperator::Sequence(sequence) => {

src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl MaterializeCTERef {
4141
impl PhysicalPlanBuilder {
4242
pub(crate) async fn build_cte_consumer(
4343
&mut self,
44-
cte_consumer: &crate::plans::MaterializeCTERef,
44+
cte_consumer: &crate::plans::MaterializedCTERef,
4545
stat_info: PlanStatsInfo,
4646
) -> Result<PhysicalPlan> {
4747
let mut fields = Vec::new();

src/query/sql/src/planner/binder/bind_context.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::binder::project_set::SetReturningInfo;
4343
use crate::binder::window::WindowInfo;
4444
use crate::binder::ColumnBindingBuilder;
4545
use crate::normalize_identifier;
46+
use crate::optimizer::ir::SExpr;
4647
use crate::plans::ScalarExpr;
4748
use crate::ColumnSet;
4849
use crate::IndexType;
@@ -197,9 +198,15 @@ impl CteContext {
197198
pub struct CteInfo {
198199
pub columns_alias: Vec<String>,
199200
pub query: Query,
200-
pub materialized: bool,
201201
pub recursive: bool,
202202
pub columns: Vec<ColumnBinding>,
203+
pub materialized_cte_info: Option<MaterializedCTEInfo>,
204+
}
205+
206+
#[derive(Clone, Debug)]
207+
pub struct MaterializedCTEInfo {
208+
pub bound_s_expr: SExpr,
209+
pub bound_context: BindContext,
203210
}
204211

205212
impl BindContext {

src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617

1718
use databend_common_ast::ast::Query;
@@ -25,18 +26,21 @@ use crate::binder::BindContext;
2526
use crate::binder::Binder;
2627
use crate::binder::CteContext;
2728
use crate::binder::CteInfo;
29+
use crate::binder::MaterializedCTEInfo;
2830
use crate::normalize_identifier;
2931
use crate::optimizer::ir::SExpr;
30-
use crate::plans::MaterializeCTERef;
3132
use crate::plans::MaterializedCTE;
33+
use crate::plans::MaterializedCTERef;
3234
use crate::plans::RelOperator;
3335
use crate::plans::Sequence;
36+
use crate::ColumnBinding;
3437

3538
impl Binder {
3639
pub fn init_cte(&mut self, bind_context: &mut BindContext, with: &Option<With>) -> Result<()> {
3740
let Some(with) = with else {
3841
return Ok(());
3942
};
43+
4044
for cte in with.ctes.iter() {
4145
let cte_name = self.normalize_identifier(&cte.alias.name).name;
4246
if bind_context.cte_context.cte_map.contains_key(&cte_name) {
@@ -45,6 +49,21 @@ impl Binder {
4549
)));
4650
}
4751

52+
let materialized_cte_info = if cte.materialized {
53+
let (s_expr, cte_bind_context) = self.bind_cte_definition(
54+
&cte_name,
55+
&bind_context.cte_context.cte_map,
56+
&cte.query,
57+
)?;
58+
let materialized_cte_info = MaterializedCTEInfo {
59+
bound_s_expr: s_expr,
60+
bound_context: cte_bind_context,
61+
};
62+
Some(materialized_cte_info)
63+
} else {
64+
None
65+
};
66+
4867
let column_name = cte
4968
.alias
5069
.columns
@@ -56,8 +75,8 @@ impl Binder {
5675
columns_alias: column_name,
5776
query: *cte.query.clone(),
5877
recursive: with.recursive,
59-
materialized: cte.materialized,
6078
columns: vec![],
79+
materialized_cte_info,
6180
};
6281
bind_context.cte_context.cte_map.insert(cte_name, cte_info);
6382
}
@@ -70,6 +89,7 @@ impl Binder {
7089
table_name: &str,
7190
alias: &Option<TableAlias>,
7291
cte_info: &CteInfo,
92+
producer_column_bindings: &[ColumnBinding],
7393
) -> Result<(SExpr, BindContext)> {
7494
let (s_expr, cte_bind_context) = self.bind_cte_definition(
7595
table_name,
@@ -117,15 +137,24 @@ impl Binder {
117137
let output_columns = cte_output_columns.iter().map(|c| c.index).collect();
118138

119139
let mut new_bind_context = bind_context.clone();
120-
for column in cte_output_columns {
121-
new_bind_context.add_column_binding(column);
140+
for column in cte_output_columns.iter() {
141+
new_bind_context.add_column_binding(column.clone());
122142
}
123143

124-
let s_expr = SExpr::create_leaf(Arc::new(RelOperator::MaterializeCTERef(
125-
MaterializeCTERef {
144+
let mut column_mapping = HashMap::new();
145+
for (index_in_ref, index_in_producer) in cte_output_columns
146+
.iter()
147+
.zip(producer_column_bindings.iter())
148+
{
149+
column_mapping.insert(index_in_ref.index, index_in_producer.index);
150+
}
151+
152+
let s_expr = SExpr::create_leaf(Arc::new(RelOperator::MaterializedCTERef(
153+
MaterializedCTERef {
126154
cte_name: table_name.to_string(),
127155
output_columns,
128156
def: s_expr,
157+
column_mapping,
129158
},
130159
)));
131160
Ok((s_expr, new_bind_context))
@@ -164,13 +193,16 @@ impl Binder {
164193
let mut current_expr = main_query_expr;
165194

166195
for cte in with.ctes.iter().rev() {
167-
if cte.materialized {
168-
let cte_name = self.normalize_identifier(&cte.alias.name).name;
169-
let (s_expr, bind_context) =
170-
self.bind_cte_definition(&cte_name, &cte_context.cte_map, &cte.query)?;
196+
let cte_name = self.normalize_identifier(&cte.alias.name).name;
197+
let cte_info = cte_context.cte_map.get(&cte_name).ok_or_else(|| {
198+
ErrorCode::Internal(format!("CTE '{}' not found in context", cte_name))
199+
})?;
200+
if let Some(materialized_cte_info) = &cte_info.materialized_cte_info {
201+
let s_expr = materialized_cte_info.bound_s_expr.clone();
202+
let bind_context = materialized_cte_info.bound_context.clone();
171203

172204
let materialized_cte =
173-
MaterializedCTE::new(cte_name, Some(bind_context.columns), None);
205+
MaterializedCTE::new(cte_name, Some(bind_context.columns.clone()), None);
174206
let materialized_cte = SExpr::create_unary(materialized_cte, s_expr);
175207
let sequence = Sequence {};
176208
current_expr = SExpr::create_binary(sequence, materialized_cte, current_expr);

src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,14 @@ impl Binder {
7979

8080
let cte_map = bind_context.cte_context.cte_map.clone();
8181
if let Some(cte_info) = cte_map.get(&table_name) {
82-
if cte_info.materialized {
83-
return self.bind_cte_consumer(bind_context, &table_name, alias, cte_info);
82+
if let Some(materialized_cte_info) = &cte_info.materialized_cte_info {
83+
return self.bind_cte_consumer(
84+
bind_context,
85+
&table_name,
86+
alias,
87+
cte_info,
88+
&materialized_cte_info.bound_context.columns,
89+
);
8490
} else {
8591
if self
8692
.metadata

src/query/sql/src/planner/optimizer/ir/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String {
8181
RelOperator::MutationSource(_) => "MutationSource".to_string(),
8282
RelOperator::CompactBlock(_) => "CompactBlock".to_string(),
8383
RelOperator::MaterializedCTE(_) => "MaterializedCTE".to_string(),
84-
RelOperator::MaterializeCTERef(_) => "MaterializeCTERef".to_string(),
84+
RelOperator::MaterializedCTERef(_) => "MaterializeCTERef".to_string(),
8585
RelOperator::Sequence(_) => "Sequence".to_string(),
8686
}
8787
}

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::optimizer::optimizers::operator::SubqueryDecorrelatorOptimizer;
3636
use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer;
3737
use crate::optimizer::optimizers::rule::RuleID;
3838
use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES;
39+
use crate::optimizer::optimizers::CTEFilterPushdownOptimizer;
3940
use crate::optimizer::optimizers::CascadesOptimizer;
4041
use crate::optimizer::optimizers::DPhpyOptimizer;
4142
use crate::optimizer::pipeline::OptimizerPipeline;
@@ -254,32 +255,34 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
254255
opt_ctx.clone(),
255256
&DEFAULT_REWRITE_RULES,
256257
))
257-
// 8. Run post rewrite rules
258+
// 8. CTE filter pushdown optimization
259+
.add(CTEFilterPushdownOptimizer::new(opt_ctx.clone()))
260+
// 9. Run post rewrite rules
258261
.add(RecursiveRuleOptimizer::new(opt_ctx.clone(), &[
259262
RuleID::SplitAggregate,
260263
]))
261-
// 9. Apply DPhyp algorithm for cost-based join reordering
264+
// 10. Apply DPhyp algorithm for cost-based join reordering
262265
.add(DPhpyOptimizer::new(opt_ctx.clone()))
263-
// 10. After join reorder, Convert some single join to inner join.
266+
// 11. After join reorder, Convert some single join to inner join.
264267
.add(SingleToInnerOptimizer::new())
265-
// 11. Deduplicate join conditions.
268+
// 12. Deduplicate join conditions.
266269
.add(DeduplicateJoinConditionOptimizer::new())
267-
// 12. Apply join commutativity to further optimize join ordering
270+
// 13. Apply join commutativity to further optimize join ordering
268271
.add_if(
269272
opt_ctx.get_enable_join_reorder(),
270273
RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice()),
271274
)
272-
// 13. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
275+
// 14. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
273276
.add(CascadesOptimizer::new(opt_ctx.clone())?)
274-
// 14. Eliminate unnecessary scalar calculations to clean up the final plan
277+
// 15. Eliminate unnecessary scalar calculations to clean up the final plan
275278
.add_if(
276279
!opt_ctx.get_planning_agg_index(),
277280
RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice()),
278281
)
279-
// 15. Clean up unused CTEs
282+
// 16. Clean up unused CTEs
280283
.add(CleanupUnusedCTEOptimizer);
281284

282-
// 15. Execute the pipeline
285+
// 17. Execute the pipeline
283286
let s_expr = pipeline.execute().await?;
284287

285288
Ok(s_expr)

0 commit comments

Comments
 (0)