Skip to content

Commit e81d8da

Browse files
authored
fix: union all with order by (#5674)
1 parent 1a5350f commit e81d8da

File tree

1 file changed

+28
-7
lines changed
  • src/service/search/datafusion/distributed_plan

1 file changed

+28
-7
lines changed

src/service/search/datafusion/distributed_plan/rewrite.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ use datafusion::{
2121
tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor},
2222
Result, TableReference,
2323
},
24+
physical_expr::LexOrdering,
2425
physical_plan::{
25-
repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties, Partitioning,
26+
repartition::RepartitionExec,
27+
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
28+
ExecutionPlan, ExecutionPlanProperties, Partitioning,
2629
},
2730
};
2831
use hashbrown::HashMap;
@@ -120,12 +123,30 @@ impl TreeNodeRewriter for RemoteScanRewriter {
120123
for child in node.children() {
121124
let mut visitor = TableNameVisitor::new();
122125
child.visit(&mut visitor)?;
123-
let table_name = visitor.table_name.clone().unwrap();
124-
let remote_scan = Arc::new(RemoteScanExec::new(
125-
child.clone(),
126-
self.remote_scan_nodes.get_remote_node(&table_name),
127-
)?);
128-
new_children.push(remote_scan);
126+
// For sort, we should add a SortPreservingMergeExec
127+
if child.name() == "SortExec" {
128+
let table_name = visitor.table_name.clone().unwrap();
129+
let sort = child.as_any().downcast_ref::<SortExec>().unwrap();
130+
let sort_merge = Arc::new(
131+
SortPreservingMergeExec::new(
132+
LexOrdering::new(sort.expr().to_vec()),
133+
Arc::new(sort.clone()),
134+
)
135+
.with_fetch(sort.fetch()),
136+
);
137+
let remote_scan = Arc::new(RemoteScanExec::new(
138+
sort_merge,
139+
self.remote_scan_nodes.get_remote_node(&table_name),
140+
)?);
141+
new_children.push(remote_scan);
142+
} else {
143+
let table_name = visitor.table_name.clone().unwrap();
144+
let remote_scan = Arc::new(RemoteScanExec::new(
145+
child.clone(),
146+
self.remote_scan_nodes.get_remote_node(&table_name),
147+
)?);
148+
new_children.push(remote_scan);
149+
}
129150
}
130151
let new_node = node.with_new_children(new_children)?;
131152
self.is_changed = true;

0 commit comments

Comments
 (0)