Skip to content

Commit 4085be4

Browse files
authored
feat(query): enable swap between tables (#18767)
* feat(query): enable swap between tables * only the current role have OWNERSHIP privileges on the tables can execute swap
1 parent ad48962 commit 4085be4

File tree

31 files changed

+741
-0
lines changed

31 files changed

+741
-0
lines changed

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyAction;
115115
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
116116
use databend_common_meta_app::schema::SetTableRowAccessPolicyAction;
117117
use databend_common_meta_app::schema::SetTableRowAccessPolicyReq;
118+
use databend_common_meta_app::schema::SwapTableReq;
118119
use databend_common_meta_app::schema::TableCopiedFileInfo;
119120
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
120121
use databend_common_meta_app::schema::TableId;
@@ -310,6 +311,7 @@ impl SchemaApiTestSuite {
310311
.drop_table_without_table_id_list(&b.build().await)
311312
.await?;
312313
suite.table_rename(&b.build().await).await?;
314+
suite.table_swap(&b.build().await).await?;
313315
suite.table_update_meta(&b.build().await).await?;
314316
suite.table_update_mask_policy(&b.build().await).await?;
315317
suite
@@ -2377,6 +2379,179 @@ impl SchemaApiTestSuite {
23772379
Ok(())
23782380
}
23792381

2382+
#[fastrace::trace]
2383+
async fn table_swap<MT: SchemaApi + kvapi::KVApi<Error = MetaError>>(
2384+
&self,
2385+
mt: &MT,
2386+
) -> anyhow::Result<()> {
2387+
let tenant_name = "tenant1";
2388+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
2389+
2390+
let db1_name = "db1";
2391+
let tb1_name = "table_a";
2392+
let tb2_name = "table_b";
2393+
2394+
let schema = || {
2395+
Arc::new(TableSchema::new(vec![TableField::new(
2396+
"number",
2397+
TableDataType::Number(NumberDataType::UInt64),
2398+
)]))
2399+
};
2400+
2401+
let _table_meta = |created_on| TableMeta {
2402+
schema: schema(),
2403+
engine: "JSON".to_string(),
2404+
options: maplit::btreemap! {"opt-1".into() => "val-1".into()},
2405+
created_on,
2406+
..TableMeta::default()
2407+
};
2408+
2409+
let swap_req = |if_exists| SwapTableReq {
2410+
if_exists,
2411+
origin_table: TableNameIdent {
2412+
tenant: tenant.clone(),
2413+
db_name: db1_name.to_string(),
2414+
table_name: tb1_name.to_string(),
2415+
},
2416+
target_table_name: tb2_name.to_string(),
2417+
};
2418+
2419+
info!("--- swap tables on unknown db");
2420+
{
2421+
let got = mt.swap_table(swap_req(false)).await;
2422+
debug!("--- swap tables on unknown database got: {:?}", got);
2423+
2424+
assert!(got.is_err());
2425+
assert_eq!(
2426+
ErrorCode::UNKNOWN_DATABASE,
2427+
ErrorCode::from(got.unwrap_err()).code()
2428+
);
2429+
}
2430+
2431+
info!("--- prepare db and tables");
2432+
let created_on = Utc::now();
2433+
let mut util1 = Util::new(mt, tenant_name, db1_name, tb1_name, "JSON");
2434+
util1.create_db().await?;
2435+
2436+
let mut util2 = Util::new(mt, tenant_name, db1_name, tb2_name, "JSON");
2437+
2438+
info!("--- create table_a");
2439+
let tb1_ident = {
2440+
let old_db = util1.get_database().await?;
2441+
let (table_id, _table_meta) = util1
2442+
.create_table_with(
2443+
|mut meta| {
2444+
meta.schema = schema();
2445+
meta.created_on = created_on;
2446+
meta
2447+
},
2448+
|req| req,
2449+
)
2450+
.await?;
2451+
let cur_db = util1.get_database().await?;
2452+
assert!(old_db.meta.seq < cur_db.meta.seq);
2453+
let got = util1.get_table().await?;
2454+
assert_eq!(table_id, got.ident.table_id);
2455+
got.ident
2456+
};
2457+
2458+
info!("--- create table_b");
2459+
let tb2_ident = {
2460+
let old_db = util2.get_database().await?;
2461+
let (table_id, _table_meta) = util2
2462+
.create_table_with(
2463+
|mut meta| {
2464+
meta.schema = schema();
2465+
meta.created_on = created_on;
2466+
meta
2467+
},
2468+
|req| req,
2469+
)
2470+
.await?;
2471+
let cur_db = util2.get_database().await?;
2472+
assert!(old_db.meta.seq < cur_db.meta.seq);
2473+
let got = util2.get_table().await?;
2474+
assert_eq!(table_id, got.ident.table_id);
2475+
got.ident
2476+
};
2477+
2478+
info!("--- swap tables, both exist, ok");
2479+
{
2480+
let old_db = util1.get_database().await?;
2481+
let _reply = mt.swap_table(swap_req(false)).await?;
2482+
let cur_db = util1.get_database().await?;
2483+
assert!(old_db.meta.seq < cur_db.meta.seq);
2484+
2485+
// Verify tables are swapped
2486+
let got_a = mt
2487+
.get_table((tenant_name, db1_name, tb1_name).into())
2488+
.await?;
2489+
let got_b = mt
2490+
.get_table((tenant_name, db1_name, tb2_name).into())
2491+
.await?;
2492+
2493+
// table_a name should now point to table_b's id
2494+
assert_eq!(tb2_ident.table_id, got_a.ident.table_id);
2495+
// table_b name should now point to table_a's id
2496+
assert_eq!(tb1_ident.table_id, got_b.ident.table_id);
2497+
}
2498+
2499+
info!("--- swap tables again, should restore original mapping");
2500+
{
2501+
let _reply = mt.swap_table(swap_req(false)).await?;
2502+
2503+
// Verify tables are swapped back
2504+
let got_a = mt
2505+
.get_table((tenant_name, db1_name, tb1_name).into())
2506+
.await?;
2507+
let got_b = mt
2508+
.get_table((tenant_name, db1_name, tb2_name).into())
2509+
.await?;
2510+
2511+
// table_a name should point back to table_a's id
2512+
assert_eq!(tb1_ident.table_id, got_a.ident.table_id);
2513+
// table_b name should point back to table_b's id
2514+
assert_eq!(tb2_ident.table_id, got_b.ident.table_id);
2515+
}
2516+
2517+
info!("--- swap non-existent table with if_exists=false, error");
2518+
{
2519+
let swap_req_nonexist = SwapTableReq {
2520+
if_exists: false,
2521+
origin_table: TableNameIdent {
2522+
tenant: tenant.clone(),
2523+
db_name: db1_name.to_string(),
2524+
table_name: "non_existent".to_string(),
2525+
},
2526+
target_table_name: tb2_name.to_string(),
2527+
};
2528+
2529+
let got = mt.swap_table(swap_req_nonexist).await;
2530+
assert!(got.is_err());
2531+
assert_eq!(
2532+
ErrorCode::UNKNOWN_TABLE,
2533+
ErrorCode::from(got.unwrap_err()).code()
2534+
);
2535+
}
2536+
2537+
info!("--- swap non-existent table with if_exists=true, ok");
2538+
{
2539+
let swap_req_nonexist = SwapTableReq {
2540+
if_exists: true,
2541+
origin_table: TableNameIdent {
2542+
tenant: tenant.clone(),
2543+
db_name: db1_name.to_string(),
2544+
table_name: "non_existent".to_string(),
2545+
},
2546+
target_table_name: tb2_name.to_string(),
2547+
};
2548+
2549+
assert!(mt.swap_table(swap_req_nonexist).await.is_ok());
2550+
}
2551+
2552+
Ok(())
2553+
}
2554+
23802555
#[fastrace::trace]
23812556
async fn table_update_meta<MT: SchemaApi + kvapi::KVApi<Error = MetaError>>(
23822557
&self,

src/meta/api/src/table_api.rs

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ use databend_common_meta_app::schema::ListDroppedTableResp;
6969
use databend_common_meta_app::schema::ListTableReq;
7070
use databend_common_meta_app::schema::RenameTableReply;
7171
use databend_common_meta_app::schema::RenameTableReq;
72+
use databend_common_meta_app::schema::SwapTableReply;
73+
use databend_common_meta_app::schema::SwapTableReq;
7274
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
7375
use databend_common_meta_app::schema::TableId;
7476
use databend_common_meta_app::schema::TableIdHistoryIdent;
@@ -712,6 +714,192 @@ where
712714
}
713715
}
714716

717+
#[logcall::logcall]
718+
#[fastrace::trace]
719+
async fn swap_table(&self, req: SwapTableReq) -> Result<SwapTableReply, KVAppError> {
720+
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
721+
722+
let mut trials = txn_backoff(None, func_name!());
723+
loop {
724+
trials.next().unwrap()?.await;
725+
726+
// Get databases
727+
let tenant_dbname_left = req.origin_table.db_name_ident();
728+
729+
let (seq_db_id_left, db_meta_left) =
730+
get_db_or_err(self, &tenant_dbname_left, "swap_table: tenant_dbname_left").await?;
731+
732+
let dbid_tbname_left = DBIdTableName {
733+
db_id: *seq_db_id_left.data,
734+
table_name: req.origin_table.table_name.clone(),
735+
};
736+
737+
let (tb_id_seq_left, table_id_left) = get_u64_value(self, &dbid_tbname_left).await?;
738+
if req.if_exists && tb_id_seq_left == 0 {
739+
return Ok(SwapTableReply {});
740+
}
741+
assert_table_exist(
742+
tb_id_seq_left,
743+
&req.origin_table,
744+
"swap_table: origin_table",
745+
)?;
746+
747+
let dbid_tbname_right = DBIdTableName {
748+
db_id: *seq_db_id_left.data,
749+
table_name: req.target_table_name.clone(),
750+
};
751+
752+
let (tb_id_seq_right, table_id_right) = get_u64_value(self, &dbid_tbname_right).await?;
753+
if req.if_exists && tb_id_seq_right == 0 {
754+
return Ok(SwapTableReply {});
755+
}
756+
assert_table_exist(
757+
tb_id_seq_right,
758+
&TableNameIdent {
759+
tenant: req.origin_table.tenant.clone(),
760+
db_name: req.origin_table.db_name.clone(),
761+
table_name: req.target_table_name.clone(),
762+
},
763+
"swap_table: target_table",
764+
)?;
765+
766+
// Get table id lists
767+
let dbid_tbname_idlist_left = TableIdHistoryIdent {
768+
database_id: *seq_db_id_left.data,
769+
table_name: req.origin_table.table_name.clone(),
770+
};
771+
let dbid_tbname_idlist_right = TableIdHistoryIdent {
772+
database_id: *seq_db_id_left.data,
773+
table_name: req.target_table_name.clone(),
774+
};
775+
776+
let seq_table_history_left = self.get_pb(&dbid_tbname_idlist_left).await?;
777+
let seq_table_history_right = self.get_pb(&dbid_tbname_idlist_right).await?;
778+
779+
let tb_id_list_seq_left = seq_table_history_left.seq();
780+
let tb_id_list_seq_right = seq_table_history_right.seq();
781+
782+
let mut tb_id_list_left = seq_table_history_left
783+
.into_value()
784+
.unwrap_or_else(|| TableIdList::new_with_ids([table_id_left]));
785+
let mut tb_id_list_right = seq_table_history_right
786+
.into_value()
787+
.unwrap_or_else(|| TableIdList::new_with_ids([table_id_right]));
788+
789+
// Validate table IDs in history lists
790+
{
791+
let last_left = tb_id_list_left.last().copied();
792+
if Some(table_id_left) != last_left {
793+
let err_message = format!(
794+
"swap_table {:?} but last table id conflict, id list last: {:?}, current: {}",
795+
req.origin_table, last_left, table_id_left
796+
);
797+
error!("{}", err_message);
798+
return Err(KVAppError::AppError(AppError::UnknownTable(
799+
UnknownTable::new(&req.origin_table.table_name, err_message),
800+
)));
801+
}
802+
803+
let last_right = tb_id_list_right.last().copied();
804+
if Some(table_id_right) != last_right {
805+
let err_message = format!(
806+
"swap_table {:?} but last table id conflict, id list last: {:?}, current: {}",
807+
req.target_table_name, last_right, table_id_right
808+
);
809+
error!("{}", err_message);
810+
return Err(KVAppError::AppError(AppError::UnknownTable(
811+
UnknownTable::new(&req.target_table_name, err_message),
812+
)));
813+
}
814+
}
815+
816+
// Get table id to name mappings
817+
let table_id_to_name_key_left = TableIdToName {
818+
table_id: table_id_left,
819+
};
820+
let table_id_to_name_key_right = TableIdToName {
821+
table_id: table_id_right,
822+
};
823+
let table_id_to_name_seq_left = self.get_seq(&table_id_to_name_key_left).await?;
824+
let table_id_to_name_seq_right = self.get_seq(&table_id_to_name_key_right).await?;
825+
826+
// Prepare new mappings after swap
827+
let db_id_table_name_left = DBIdTableName {
828+
db_id: *seq_db_id_left.data,
829+
table_name: req.origin_table.table_name.clone(),
830+
};
831+
let db_id_table_name_right = DBIdTableName {
832+
db_id: *seq_db_id_left.data,
833+
table_name: req.target_table_name.clone(),
834+
};
835+
836+
{
837+
// Update history lists: remove current table IDs
838+
tb_id_list_left.pop();
839+
tb_id_list_right.pop();
840+
// Add swapped table IDs
841+
tb_id_list_left.append(table_id_right);
842+
tb_id_list_right.append(table_id_left);
843+
844+
let txn = TxnRequest::new(
845+
vec![
846+
// Ensure databases haven't changed
847+
txn_cond_seq(&seq_db_id_left.data, Eq, db_meta_left.seq),
848+
// Ensure table name->table_id mappings haven't changed
849+
txn_cond_seq(&dbid_tbname_left, Eq, tb_id_seq_left),
850+
txn_cond_seq(&dbid_tbname_right, Eq, tb_id_seq_right),
851+
// Ensure table history lists haven't changed
852+
txn_cond_seq(&dbid_tbname_idlist_left, Eq, tb_id_list_seq_left),
853+
txn_cond_seq(&dbid_tbname_idlist_right, Eq, tb_id_list_seq_right),
854+
// Ensure table_id->name mappings haven't changed
855+
txn_cond_seq(&table_id_to_name_key_left, Eq, table_id_to_name_seq_left),
856+
txn_cond_seq(&table_id_to_name_key_right, Eq, table_id_to_name_seq_right),
857+
],
858+
vec![
859+
// Swap table name->table_id mappings
860+
txn_op_put(&dbid_tbname_left, serialize_u64(table_id_right)?), /* origin_table_name -> target_table_id */
861+
txn_op_put(&dbid_tbname_right, serialize_u64(table_id_left)?), /* target_table_name -> origin_table_id */
862+
// Update database metadata sequences
863+
txn_op_put(&seq_db_id_left.data, serialize_struct(&*db_meta_left)?),
864+
// Update table history lists
865+
txn_op_put(
866+
&dbid_tbname_idlist_left,
867+
serialize_struct(&tb_id_list_left)?,
868+
),
869+
txn_op_put(
870+
&dbid_tbname_idlist_right,
871+
serialize_struct(&tb_id_list_right)?,
872+
),
873+
// Update table_id->name mappings
874+
txn_op_put(
875+
&table_id_to_name_key_left,
876+
serialize_struct(&db_id_table_name_right)?,
877+
), // origin_table_id -> target_table_name
878+
txn_op_put(
879+
&table_id_to_name_key_right,
880+
serialize_struct(&db_id_table_name_left)?,
881+
), // target_table_id -> origin_table_name
882+
],
883+
);
884+
885+
let (succ, _responses) = send_txn(self, txn).await?;
886+
887+
debug!(
888+
origin_table :? =(&req.origin_table),
889+
target_table_name :? =(&req.target_table_name),
890+
table_id_left :? =(&table_id_left),
891+
table_id_right :? =(&table_id_right),
892+
succ = succ;
893+
"swap_table"
894+
);
895+
896+
if succ {
897+
return Ok(SwapTableReply {});
898+
}
899+
}
900+
}
901+
}
902+
715903
#[logcall::logcall]
716904
#[fastrace::trace]
717905
async fn truncate_table(

0 commit comments

Comments
 (0)