Skip to content

feat: supported related_subquery #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ let kite_sql = DataBaseBuilder::path("./data")
- [x] View
- Drop
- [x] Table
- [ ] Index
- [x] Index
- Tips: `Drop Index table_name.index_name`
- [x] View
- Alert
- [x] Add Column
Expand Down
35 changes: 35 additions & 0 deletions src/binder/drop_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::binder::{lower_ident, Binder};
use crate::errors::DatabaseError;
use crate::planner::operator::drop_index::DropIndexOperator;
use crate::planner::operator::Operator;
use crate::planner::{Childrens, LogicalPlan};
use crate::storage::Transaction;
use crate::types::value::DataValue;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A> {
pub(crate) fn bind_drop_index(
&mut self,
name: &ObjectName,
if_exists: &bool,
) -> Result<LogicalPlan, DatabaseError> {
let table_name = name
.0
.first()
.ok_or(DatabaseError::InvalidTable(name.to_string()))?;
let index_name = name.0.get(1).ok_or(DatabaseError::InvalidIndex)?;

let table_name = Arc::new(lower_ident(table_name));
let index_name = lower_ident(index_name);

Ok(LogicalPlan::new(
Operator::DropIndex(DropIndexOperator {
table_name,
index_name,
if_exists: *if_exists,
}),
Childrens::None,
))
}
}
13 changes: 11 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sqlparser::ast::{
BinaryOperator, CharLengthUnits, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident,
Query, UnaryOperator, Value,
};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::slice;
use std::sync::Arc;

Expand Down Expand Up @@ -293,6 +293,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
self.args,
Some(self),
);

let mut sub_query = binder.bind_query(subquery)?;
let sub_query_schema = sub_query.output_schema();

Expand Down Expand Up @@ -368,7 +369,15 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
try_default!(&full_name.0, full_name.1);
}
if let Some(table) = full_name.0.or(bind_table_name) {
let source = self.context.bind_source(&table)?;
let (source, is_parent) = self.context.bind_source::<A>(self.parent, &table, false)?;

if is_parent {
self.parent_table_col
.entry(Arc::new(table.clone()))
.or_default()
.insert(full_name.1.clone());
}

let schema_buf = self.table_schema_buf.entry(Arc::new(table)).or_default();

Ok(ScalarExpression::ColumnRef(
Expand Down
15 changes: 13 additions & 2 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod create_view;
mod delete;
mod describe;
mod distinct;
mod drop_index;
mod drop_table;
mod drop_view;
mod explain;
Expand Down Expand Up @@ -275,12 +276,19 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
Ok(source)
}

pub fn bind_source<'b: 'a>(&self, table_name: &str) -> Result<&Source, DatabaseError> {
pub fn bind_source<'b: 'a, A: AsRef<[(&'static str, DataValue)]>>(
&self,
parent: Option<&'a Binder<'a, 'b, T, A>>,
table_name: &str,
is_parent: bool,
) -> Result<(&'b Source, bool), DatabaseError> {
if let Some(source) = self.bind_table.iter().find(|((t, alias, _), _)| {
t.as_str() == table_name
|| matches!(alias.as_ref().map(|a| a.as_str() == table_name), Some(true))
}) {
Ok(source.1)
Ok((source.1, is_parent))
} else if let Some(binder) = parent {
binder.context.bind_source(binder.parent, table_name, true)
} else {
Err(DatabaseError::InvalidTable(table_name.into()))
}
Expand Down Expand Up @@ -322,6 +330,7 @@ pub struct Binder<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>>
args: &'a A,
with_pk: Option<TableName>,
pub(crate) parent: Option<&'b Binder<'a, 'b, T, A>>,
pub(crate) parent_table_col: HashMap<TableName, HashSet<String>>,
}

impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, 'b, T, A> {
Expand All @@ -336,6 +345,7 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
args,
with_pk: None,
parent,
parent_table_col: Default::default(),
}
}

Expand Down Expand Up @@ -375,6 +385,7 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '
match object_type {
ObjectType::Table => self.bind_drop_table(&names[0], if_exists)?,
ObjectType::View => self.bind_drop_view(&names[0], if_exists)?,
ObjectType::Index => self.bind_drop_index(&names[0], if_exists)?,
_ => {
return Err(DatabaseError::UnsupportedStmt(
"only `Table` and `View` are allowed to be Dropped".to_string(),
Expand Down
7 changes: 6 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,18 @@ impl<S: Storage> State<S> {

let best_plan = Self::default_optimizer(source_plan)
.find_best(Some(&transaction.meta_loader(meta_cache)))?;
// println!("best_plan plan: {:#?}", best_plan);
//println!("best_plan plan: {:#?}", best_plan);

Ok(best_plan)
}

pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer {
HepOptimizer::new(source_plan)
.batch(
"Correlated Subquery".to_string(),
HepBatchStrategy::once_topdown(),
vec![NormalizationRuleImpl::CorrelateSubquery],
)
.batch(
"Column Pruning".to_string(),
HepBatchStrategy::once_topdown(),
Expand Down
43 changes: 43 additions & 0 deletions src/execution/ddl/drop_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::execution::{Executor, WriteExecutor};
use crate::planner::operator::drop_index::DropIndexOperator;
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::throw;
use crate::types::tuple_builder::TupleBuilder;

pub struct DropIndex {
op: DropIndexOperator,
}

impl From<DropIndexOperator> for DropIndex {
fn from(op: DropIndexOperator) -> Self {
Self { op }
}
}

impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropIndex {
fn execute_mut(
self,
(table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
transaction: *mut T,
) -> Executor<'a> {
Box::new(
#[coroutine]
move || {
let DropIndexOperator {
table_name,
index_name,
if_exists,
} = self.op;

throw!(unsafe { &mut (*transaction) }.drop_index(
table_cache,
table_name,
&index_name,
if_exists
));

yield Ok(TupleBuilder::build_result(index_name.to_string()));
},
)
}
}
1 change: 1 addition & 0 deletions src/execution/ddl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub(crate) mod create_index;
pub(crate) mod create_table;
pub(crate) mod create_view;
pub(crate) mod drop_column;
pub(crate) mod drop_index;
pub(crate) mod drop_table;
pub(crate) mod drop_view;
pub(crate) mod truncate;
15 changes: 11 additions & 4 deletions src/execution/dml/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fmt::Formatter;
use std::fs::DirEntry;
use std::ops::Coroutine;
use std::ops::CoroutineState;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::{fmt, fs};
Expand Down Expand Up @@ -98,10 +99,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze {
}
drop(coroutine);
let mut values = Vec::with_capacity(builders.len());
let dir_path = dirs::home_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join(table_name.as_str());
let dir_path = Self::build_statistics_meta_path(&table_name);
// For DEBUG
// println!("Statistics Path: {:#?}", dir_path);
throw!(fs::create_dir_all(&dir_path).map_err(DatabaseError::IO));
Expand Down Expand Up @@ -149,6 +147,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze {
}
}

impl Analyze {
pub fn build_statistics_meta_path(table_name: &TableName) -> PathBuf {
dirs::home_dir()
.expect("Your system does not have a Config directory!")
.join(DEFAULT_STATISTICS_META_PATH)
.join(table_name.as_str())
}
}

impl fmt::Display for AnalyzeOperator {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let indexes = self.index_metas.iter().map(|index| &index.name).join(", ");
Expand Down
5 changes: 4 additions & 1 deletion src/execution/dql/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter {

let schema = input.output_schema().clone();

//println!("{:#?}114514'\n'1919810{:#?}",predicate,schema);

let mut coroutine = build_read(input, cache, transaction);

while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let tuple = throw!(tuple);

//println!("-> Coroutine returned: {:?}", tuple);
if throw!(throw!(predicate.eval(Some((&tuple, &schema)))).is_true()) {
//println!("-> throw!: {:?}", tuple);
yield Ok(tuple);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::execution::ddl::create_index::CreateIndex;
use crate::execution::ddl::create_table::CreateTable;
use crate::execution::ddl::create_view::CreateView;
use crate::execution::ddl::drop_column::DropColumn;
use crate::execution::ddl::drop_index::DropIndex;
use crate::execution::ddl::drop_table::DropTable;
use crate::execution::ddl::drop_view::DropView;
use crate::execution::ddl::truncate::Truncate;
Expand Down Expand Up @@ -194,6 +195,7 @@ pub fn build_write<'a, T: Transaction + 'a>(
Operator::CreateView(op) => CreateView::from(op).execute_mut(cache, transaction),
Operator::DropTable(op) => DropTable::from(op).execute_mut(cache, transaction),
Operator::DropView(op) => DropView::from(op).execute_mut(cache, transaction),
Operator::DropIndex(op) => DropIndex::from(op).execute_mut(cache, transaction),
Operator::Truncate(op) => Truncate::from(op).execute_mut(cache, transaction),
Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(cache, transaction),
Operator::CopyToFile(op) => {
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/heuristic/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl HepGraph {
source_id: HepNodeId,
children_option: Option<HepNodeId>,
new_node: Operator,
) {
) -> HepNodeId {
let new_index = self.graph.add_node(new_node);
let mut order = self.graph.edges(source_id).count();

Expand All @@ -95,6 +95,7 @@ impl HepGraph {

self.graph.add_edge(source_id, new_index, order);
self.version += 1;
new_index
}

pub fn replace_node(&mut self, source_id: HepNodeId, new_node: Operator) {
Expand Down
1 change: 1 addition & 0 deletions src/optimizer/rule/normalization/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl ColumnPruning {
| Operator::CreateView(_)
| Operator::DropTable(_)
| Operator::DropView(_)
| Operator::DropIndex(_)
| Operator::Truncate(_)
| Operator::ShowTable
| Operator::ShowView
Expand Down
2 changes: 2 additions & 0 deletions src/optimizer/rule/normalization/compilation_in_advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl ExpressionRemapper {
| Operator::CreateView(_)
| Operator::DropTable(_)
| Operator::DropView(_)
| Operator::DropIndex(_)
| Operator::Truncate(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
Expand Down Expand Up @@ -212,6 +213,7 @@ impl EvaluatorBind {
| Operator::CreateView(_)
| Operator::DropTable(_)
| Operator::DropView(_)
| Operator::DropIndex(_)
| Operator::Truncate(_)
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
Expand Down
Loading
Loading