Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant_key::errors::ExistError;
use databend_common_meta_app::KeyWithTenant;
Expand Down Expand Up @@ -71,31 +70,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {

let mut txn = TxnRequest::default();

let res = self.get_id_and_value(name_ident).await?;
debug!(res :? = res, name_key :? =(name_ident); "create_data_mask");

let mut curr_seq = 0;

if let Some((seq_id, seq_meta)) = res {
match req.create_option {
CreateOption::Create => {
return Ok(Err(
name_ident.exist_error(format!("{} already exists", req.name))
));
}
CreateOption::CreateIfNotExists => {
return Ok(Ok(CreateDatamaskReply { id: *seq_id.data }));
}
CreateOption::CreateOrReplace => {
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());

txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);

curr_seq = seq_id.seq;
}
};
}

// Create data mask by inserting these record:
// name -> id
// id -> policy
Expand All @@ -114,7 +88,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
{
let meta: DatamaskMeta = req.data_mask_meta.clone();
let id_list = MaskpolicyTableIdList::default();
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
txn.condition.push(txn_cond_eq_seq(name_ident, 0));
txn.condition
.push(txn_cond_eq_seq(&row_access_name_ident, 0));
txn.if_then.extend(vec![
Expand Down
21 changes: 1 addition & 20 deletions src/meta/api/src/row_access_policy_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
let policy_id = RowAccessPolicyId::new(row_access_id);
let mut txn = TxnRequest::default();

let res = self.get_id_and_value(name_ident).await?;
debug!(res :? = res, name_key :? =(name_ident); "create_row_access");

let mut curr_seq = 0;

if let Some((seq_id, seq_meta)) = res {
if req.can_replace {
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());

txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);

// TODO(eason): need to remove row policy from table meta

curr_seq = seq_id.seq;
} else {
return Ok(Err(name_ident.exist_error(func_name!())));
}
}

// Create row policy by inserting these record:
// name -> id
// id -> policy
Expand All @@ -103,7 +84,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {

{
let meta: RowAccessPolicyMeta = req.row_access_policy_meta.clone();
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
txn.condition.push(txn_cond_eq_seq(name_ident, 0));
txn.condition.push(txn_cond_eq_seq(&mask_name_ident, 0));
txn.if_then.extend(vec![
txn_op_put_pb(name_ident, &policy_id, None)?, // name -> policy_id
Expand Down
60 changes: 0 additions & 60 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use databend_common_expression::VIRTUAL_COLUMNS_LIMIT;
use databend_common_expression::VIRTUAL_COLUMN_ID_START;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskIdIdent;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_app::data_mask::MaskPolicyIdTableId;
Expand Down Expand Up @@ -3184,7 +3183,6 @@ impl SchemaApiTestSuite {
let mask2_id;
{
let req = CreateDatamaskReq {
create_option: CreateOption::CreateIfNotExists,
name: DataMaskNameIdent::new(tenant.clone(), mask_name_1.to_string()),
data_mask_meta: DatamaskMeta {
args: vec![],
Expand All @@ -3201,7 +3199,6 @@ impl SchemaApiTestSuite {
mask1_id = get_kv_u64_data(mt, &mask1_name_ident).await?;

let req = CreateDatamaskReq {
create_option: CreateOption::CreateIfNotExists,
name: DataMaskNameIdent::new(tenant.clone(), mask_name_2.to_string()),
data_mask_meta: DatamaskMeta {
args: vec![],
Expand Down Expand Up @@ -3457,57 +3454,6 @@ impl SchemaApiTestSuite {
assert!(res.is_none());
}

info!("--- create or replace mask policy");
{
let mask_name = "replace_mask";
let name = DataMaskNameIdent::new(tenant.clone(), mask_name);
let req = CreateDatamaskReq {
create_option: CreateOption::CreateIfNotExists,
name: name.clone(),
data_mask_meta: DatamaskMeta {
args: vec![],
return_type: "".to_string(),
body: "".to_string(),
comment: Some("before".to_string()),
create_on: created_on,
update_on: None,
},
};
mt.create_data_mask(req).await??;
let old_id: u64 = get_kv_u64_data(mt, &name).await?;

let id_key = DataMaskIdIdent::new(&tenant, old_id);

let meta: DatamaskMeta = get_kv_data(mt, &id_key).await?;
assert_eq!(meta.comment, Some("before".to_string()));

let req = CreateDatamaskReq {
create_option: CreateOption::CreateOrReplace,
name: name.clone(),
data_mask_meta: DatamaskMeta {
args: vec![],
return_type: "".to_string(),
body: "".to_string(),
comment: Some("after".to_string()),
create_on: created_on,
update_on: None,
},
};
mt.create_data_mask(req).await??;

// assert old id key has been deleted
let meta: Result<DatamaskMeta, KVAppError> = get_kv_data(mt, &id_key).await;
assert!(meta.is_err());

let id: u64 = get_kv_u64_data(mt, &name).await?;
assert_ne!(old_id, id);

let id_key = DataMaskIdIdent::new(&tenant, id);

let meta: DatamaskMeta = get_kv_data(mt, &id_key).await?;
assert_eq!(meta.comment, Some("after".to_string()));
}

Ok(())
}

Expand Down Expand Up @@ -3549,7 +3495,6 @@ impl SchemaApiTestSuite {
info!("--- create row access policy");

let req = CreateRowAccessPolicyReq {
can_replace: false,
name: RowAccessPolicyNameIdent::new(tenant.clone(), policy1.to_string()),
row_access_policy_meta: RowAccessPolicyMeta {
args: vec![("number".to_string(), "UInt64".to_string())],
Expand All @@ -3566,7 +3511,6 @@ impl SchemaApiTestSuite {
let policy1_id = res.0.data;

let req = CreateRowAccessPolicyReq {
can_replace: false,
name: RowAccessPolicyNameIdent::new(tenant.clone(), policy2.to_string()),
row_access_policy_meta: RowAccessPolicyMeta {
args: vec![("number".to_string(), "UInt64".to_string())],
Expand Down Expand Up @@ -3887,7 +3831,6 @@ impl SchemaApiTestSuite {
let mask_cleanup_ident =
DataMaskNameIdent::new(tenant.clone(), mask_cleanup_name.to_string());
mt.create_data_mask(CreateDatamaskReq {
create_option: CreateOption::Create,
name: mask_cleanup_ident.clone(),
data_mask_meta: DatamaskMeta {
args: vec![],
Expand Down Expand Up @@ -3950,7 +3893,6 @@ impl SchemaApiTestSuite {
// Create another masking policy and bind it.
let mask_guard_ident = DataMaskNameIdent::new(tenant.clone(), mask_guard_name.to_string());
mt.create_data_mask(CreateDatamaskReq {
create_option: CreateOption::Create,
name: mask_guard_ident.clone(),
data_mask_meta: DatamaskMeta {
args: vec![],
Expand Down Expand Up @@ -4043,7 +3985,6 @@ impl SchemaApiTestSuite {
let policy_cleanup_ident =
RowAccessPolicyNameIdent::new(tenant.clone(), policy_cleanup_name.to_string());
mt.create_row_access_policy(CreateRowAccessPolicyReq {
can_replace: false,
name: policy_cleanup_ident.clone(),
row_access_policy_meta: RowAccessPolicyMeta {
args: vec![("number".to_string(), "UInt64".to_string())],
Expand Down Expand Up @@ -4117,7 +4058,6 @@ impl SchemaApiTestSuite {
let policy_guard_ident =
RowAccessPolicyNameIdent::new(tenant.clone(), policy_guard_name.to_string());
mt.create_row_access_policy(CreateRowAccessPolicyReq {
can_replace: false,
name: policy_guard_ident.clone(),
row_access_policy_meta: RowAccessPolicyMeta {
args: vec![("number".to_string(), "UInt64".to_string())],
Expand Down
3 changes: 0 additions & 3 deletions src/meta/app/src/data_mask/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub use mask_policy_policy_table_id_ident::MaskPolicyIdTableId;
pub use mask_policy_policy_table_id_ident::MaskPolicyTableIdIdent;
pub use mask_policy_table_id_list_ident::MaskPolicyTableIdListIdent;

use crate::schema::CreateOption;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct DatamaskMeta {
// Vec<(arg_name, arg_type)>
Expand All @@ -43,7 +41,6 @@ pub struct DatamaskMeta {

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateDatamaskReq {
pub create_option: CreateOption,
pub name: DataMaskNameIdent,
pub data_mask_meta: DatamaskMeta,
}
Expand Down
1 change: 0 additions & 1 deletion src/meta/app/src/row_access_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct RowAccessPolicyMeta {

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateRowAccessPolicyReq {
pub can_replace: bool,
pub name: RowAccessPolicyNameIdent,
pub row_access_policy_meta: RowAccessPolicyMeta,
}
Expand Down
11 changes: 3 additions & 8 deletions src/query/ast/src/ast/statements/data_mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use derive_visitor::Drive;
use derive_visitor::DriveMut;

use crate::ast::quote::QuotedString;
use crate::ast::CreateOption;
use crate::ast::Expr;
use crate::ast::TypeName;

Expand All @@ -39,19 +38,15 @@ pub struct DataMaskPolicy {

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CreateDatamaskPolicyStmt {
pub create_option: CreateOption,
pub if_not_exists: bool,
pub name: String,
pub policy: DataMaskPolicy,
}

impl Display for CreateDatamaskPolicyStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "CREATE ")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, "OR REPLACE ")?;
}
write!(f, "MASKING POLICY ")?;
if let CreateOption::CreateIfNotExists = self.create_option {
write!(f, "CREATE MASKING POLICY ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
write!(f, "{} AS (", self.name)?;
Expand Down
13 changes: 4 additions & 9 deletions src/query/ast/src/ast/statements/row_access_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use derive_visitor::Drive;
use derive_visitor::DriveMut;

use crate::ast::write_comma_separated_list;
use crate::ast::CreateOption;
use crate::ast::Expr;
use crate::ast::Identifier;
use crate::ast::TypeName;
Expand Down Expand Up @@ -54,23 +53,19 @@ impl Display for RowAccessPolicyDefinition {

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CreateRowAccessPolicyStmt {
pub create_option: CreateOption,
pub if_not_exists: bool,
pub name: Identifier,
pub description: Option<String>,
pub definition: RowAccessPolicyDefinition,
}

// CREATE [ OR REPLACE ] ROW ACCESS POLICY [ IF NOT EXISTS ] <name> AS
// CREATE ROW ACCESS POLICY [ IF NOT EXISTS ] <name> AS
// ( <arg_name> <arg_type> [ , ... ] ) RETURNS BOOLEAN -> <body>
// [ COMMENT = '<string_literal>' ]
impl Display for CreateRowAccessPolicyStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "CREATE")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, " OR REPLACE")?;
}
write!(f, " ROW ACCESS POLICY")?;
if let CreateOption::CreateIfNotExists = self.create_option {
write!(f, "CREATE ROW ACCESS POLICY")?;
if self.if_not_exists {
write!(f, " IF NOT EXISTS")?;
}
write!(f, " {} AS {}", self.name, self.definition)?;
Expand Down
38 changes: 16 additions & 22 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1826,26 +1826,22 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
);

// row policy
// CREATE [ OR REPLACE ] ROW ACCESS POLICY [ IF NOT EXISTS ] <name> AS
// CREATE ROW ACCESS POLICY [ IF NOT EXISTS ] <name> AS
// ( <arg_name> <arg_type> [ , ... ] ) RETURNS BOOLEAN -> <body>
// [ COMMENT = '<string_literal>' ]
let create_row_access = map_res(
let create_row_access = map(
rule! {
CREATE ~ ( OR ~ ^REPLACE )? ~ ROW ~ ACCESS ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )?
CREATE ~ ROW ~ ACCESS ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #ident ~ #row_access_definition
~ ( COMMENT ~ ^"=" ~ ^#literal_string )?
},
|(_, opt_or_replace, _, _, _, opt_if_not_exists, name, definition, opt_comment)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
Ok(Statement::CreateRowAccessPolicy(
CreateRowAccessPolicyStmt {
create_option,
name,
description: opt_comment.map(|(_, _, description)| description),
definition,
},
))
|(_, _, _, _, opt_if_not_exists, name, definition, opt_comment)| {
Statement::CreateRowAccessPolicy(CreateRowAccessPolicyStmt {
if_not_exists: opt_if_not_exists.is_some(),
name,
description: opt_comment.map(|(_, _, description)| description),
definition,
})
},
);
let drop_row_access = map(
Expand Down Expand Up @@ -2066,19 +2062,17 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
let show_file_formats = value(Statement::ShowFileFormats, rule! { SHOW ~ FILE ~ FORMATS });

// data mark policy
let create_data_mask_policy = map_res(
let create_data_mask_policy = map(
rule! {
CREATE ~ ( OR ~ ^REPLACE )? ~ MASKING ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #data_mask_policy
CREATE ~ MASKING ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #data_mask_policy
},
|(_, opt_or_replace, _, _, opt_if_not_exists, name, policy)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
|(_, _, _, opt_if_not_exists, name, policy)| {
let stmt = CreateDatamaskPolicyStmt {
create_option,
if_not_exists: opt_if_not_exists.is_some(),
name: name.to_string(),
policy,
};
Ok(Statement::CreateDatamaskPolicy(stmt))
Statement::CreateDatamaskPolicy(stmt)
},
);
let drop_data_mask_policy = map(
Expand Down Expand Up @@ -2787,7 +2781,7 @@ AS
[ WEBHOOK = ( url = <string_literal>, method = <string_literal>, authorization_header = <string_literal> ) ]
[ COMMENT = '<string_literal>' ]`"
| #create_connection: "`CREATE [OR REPLACE] CONNECTION [IF NOT EXISTS] <connection_name> STORAGE_TYPE = <type> <storage_configs>`"
| #create_row_access : "`CREATE [ OR REPLACE ] ROW ACCESS POLICY [ IF NOT EXISTS ] <name> AS ( <arg_name> <arg_type> [ , ... ] ) RETURNS BOOLEAN -> <body> [ COMMENT = '<string_literal>' ]`"
| #create_row_access : "`CREATE ROW ACCESS POLICY [ IF NOT EXISTS ] <name> AS ( <arg_name> <arg_type> [ , ... ] ) RETURNS BOOLEAN -> <body> [ COMMENT = '<string_literal>' ]`"
| #create_user : "`CREATE [OR REPLACE] USER [IF NOT EXISTS] '<username>' IDENTIFIED [WITH <auth_type>] [BY <password>] [WITH <user_option>, ...]`"
| #create_role : "`CREATE ROLE [IF NOT EXISTS] <role_name> [COMMENT ='<string_literal>']`"
| #create_udf : "`CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] <udf_name> <udf_definition> [DESC = <description>]`"
Expand Down
5 changes: 2 additions & 3 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ SELECT * from s;"#,
r#"SELECT * FROM t GROUP BY a, ROLLUP (b, c)"#,
r#"SELECT * FROM t GROUP BY GROUPING SETS ((a, b)), a, ROLLUP (b, c)"#,
r#"CREATE MASKING POLICY email_mask AS (val STRING) RETURNS STRING -> CASE WHEN current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy'"#,
r#"CREATE OR REPLACE MASKING POLICY email_mask AS (val STRING) RETURNS STRING -> CASE WHEN current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy'"#,
r#"DESC MASKING POLICY email_mask"#,
r#"DROP MASKING POLICY IF EXISTS email_mask"#,
r#"REFRESH VIRTUAL COLUMN FOR t"#,
Expand Down Expand Up @@ -974,12 +973,12 @@ SELECT * from s;"#,
r#"ALTER TABLE p1 CONNECTION=(CONNECTION_NAME='test')"#,
r#"ALTER table t connection=(access_key_id ='x' secret_access_key ='y' endpoint_url='http://127.0.0.1:9900')"#,
// row policy
r#"create or replace row access policy rap_it as (empl_id varchar) returns boolean ->
r#"create row access policy rap_it as (empl_id varchar) returns boolean ->
case
when 'it_admin' = current_role() then true
else false
end"#,
r#"create or replace row access policy rap_sales_manager_regions_1 as (sales_region varchar) returns boolean ->
r#"create row access policy if not exists rap_sales_manager_regions_1 as (sales_region varchar) returns boolean ->
'sales_executive_role' = current_role()
or exists (
select 1 from salesmanagerregions
Expand Down
Loading
Loading