Skip to content

Commit d427567

Browse files
committed
chore: codefmt
1 parent 7551f8c commit d427567

File tree

8 files changed

+76
-51
lines changed

8 files changed

+76
-51
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/api/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ anyhow = { workspace = true }
1313
async-backtrace = { workspace = true }
1414
async-trait = { workspace = true }
1515
chrono = { workspace = true }
16+
databend-common-ast = { workspace = true }
1617
databend-common-base = { workspace = true }
1718
databend-common-exception = { workspace = true }
1819
databend-common-expression = { workspace = true }

src/meta/api/src/garbage_collection_api.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::ops::Range;
1616

1717
use chrono::DateTime;
1818
use chrono::Utc;
19+
use databend_common_ast::ast::AutoIncrement;
1920
use databend_common_base::vec_ext::VecExt;
2021
use databend_common_meta_app::app_error::AppError;
2122
use databend_common_meta_app::app_error::CleanDbIdTableNamesFailed;
@@ -24,6 +25,7 @@ use databend_common_meta_app::principal::OwnershipObject;
2425
use databend_common_meta_app::principal::TenantOwnershipObjectIdent;
2526
use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
2627
use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent;
28+
use databend_common_meta_app::schema::sequence_storage::SequenceStorageIdent;
2729
use databend_common_meta_app::schema::table_niv::TableNIV;
2830
use databend_common_meta_app::schema::DBIdTableName;
2931
use databend_common_meta_app::schema::DatabaseId;
@@ -33,6 +35,7 @@ use databend_common_meta_app::schema::DroppedId;
3335
use databend_common_meta_app::schema::GcDroppedTableReq;
3436
use databend_common_meta_app::schema::IndexNameIdent;
3537
use databend_common_meta_app::schema::ListIndexesReq;
38+
use databend_common_meta_app::schema::SequenceIdent;
3639
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
3740
use databend_common_meta_app::schema::TableId;
3841
use databend_common_meta_app::schema::TableIdHistoryIdent;
@@ -42,6 +45,7 @@ use databend_common_meta_kvapi::kvapi;
4245
use databend_common_meta_kvapi::kvapi::DirName;
4346
use databend_common_meta_kvapi::kvapi::Key;
4447
use databend_common_meta_types::txn_op_response::Response;
48+
use databend_common_meta_types::ConditionResult;
4549
use databend_common_meta_types::MetaError;
4650
use databend_common_meta_types::TxnRequest;
4751
use display_more::DisplaySliceExt;
@@ -55,6 +59,7 @@ use crate::index_api::IndexApi;
5559
use crate::kv_app_error::KVAppError;
5660
use crate::kv_pb_api::KVPbApi;
5761
use crate::txn_backoff::txn_backoff;
62+
use crate::txn_cond_seq;
5863
use crate::txn_condition_util::txn_cond_eq_seq;
5964
use crate::txn_core_util::send_txn;
6065
use crate::txn_core_util::txn_delete_exact;
@@ -543,6 +548,27 @@ async fn remove_data_for_dropped_table(
543548
txn_delete_exact(txn, &id_to_name, seq_name.seq);
544549
}
545550

551+
// Remove table auto increment sequences
552+
{
553+
// clear the sequence associated with auto increment in the table field
554+
for table_field in seq_meta.schema.fields() {
555+
if table_field.auto_increment_display().is_none() {
556+
continue;
557+
}
558+
let sequence_name =
559+
AutoIncrement::sequence_name(table_id.table_id, table_field.column_id);
560+
561+
let sequence_ident = SequenceIdent::new(tenant, sequence_name);
562+
let storage_ident = SequenceStorageIdent::new_from(sequence_ident.clone());
563+
564+
txn.condition
565+
.push(txn_cond_seq(&sequence_ident, ConditionResult::Gt, 0));
566+
txn.condition
567+
.push(txn_cond_seq(&storage_ident, ConditionResult::Gt, 0));
568+
txn.if_then.push(txn_op_del(&sequence_ident));
569+
txn.if_then.push(txn_op_del(&storage_ident));
570+
}
571+
}
546572
// Remove table ownership
547573
{
548574
let table_ownership = OwnershipObject::Table {

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::vec;
2323
use chrono::DateTime;
2424
use chrono::Duration;
2525
use chrono::Utc;
26+
use databend_common_ast::ast::AutoIncrement;
2627
use databend_common_base::runtime::Runtime;
2728
use databend_common_base::runtime::TrySpawn;
2829
use databend_common_exception::ErrorCode;
@@ -5023,7 +5024,19 @@ impl SchemaApiTestSuite {
50235024
Arc::new(TableSchema::new(vec![TableField::new(
50245025
"number",
50255026
TableDataType::Number(NumberDataType::UInt64),
5026-
)]))
5027+
)
5028+
.with_auto_increment_display(Some(
5029+
" AUTOINCREMENT (0, 1) ORDER".to_string(),
5030+
))]))
5031+
};
5032+
let auto_increments = || {
5033+
let mut auto_increments = BTreeMap::new();
5034+
auto_increments.insert(0, AutoIncrement {
5035+
start: 0,
5036+
step: 1,
5037+
is_ordered: true,
5038+
});
5039+
auto_increments
50275040
};
50285041

50295042
let options = || maplit::btreemap! {"opt‐1".into() => "val-1".into()};
@@ -5054,7 +5067,7 @@ impl SchemaApiTestSuite {
50545067
},
50555068
table_meta: drop_table_meta(created_on),
50565069
as_dropped: true,
5057-
auto_increments: Default::default(),
5070+
auto_increments: auto_increments(),
50585071
table_properties: None,
50595072
table_partition: None,
50605073
};
@@ -5174,7 +5187,7 @@ impl SchemaApiTestSuite {
51745187
},
51755188
table_meta: table_meta(created_on),
51765189
as_dropped: true,
5177-
auto_increments: Default::default(),
5190+
auto_increments: auto_increments(),
51785191
table_properties: None,
51795192
table_partition: None,
51805193
};
@@ -5194,7 +5207,7 @@ impl SchemaApiTestSuite {
51945207
},
51955208
table_meta: drop_table_meta(created_on),
51965209
as_dropped: true,
5197-
auto_increments: Default::default(),
5210+
auto_increments: auto_increments(),
51985211
table_properties: None,
51995212
table_partition: None,
52005213
};
@@ -5230,7 +5243,7 @@ impl SchemaApiTestSuite {
52305243
},
52315244
table_meta: drop_table_meta(created_on),
52325245
as_dropped: true,
5233-
auto_increments: Default::default(),
5246+
auto_increments: auto_increments(),
52345247
table_properties: None,
52355248
table_partition: None,
52365249
};
@@ -5254,6 +5267,21 @@ impl SchemaApiTestSuite {
52545267
let seqv = mt.get_kv(&table_key.to_string_key()).await?;
52555268
assert!(seqv.is_some() && seqv.unwrap().seq != 0);
52565269

5270+
let auto_increment_sequence = SequenceIdent::new(
5271+
&tenant,
5272+
AutoIncrement::sequence_name(create_table_as_dropped_resp.table_id, 0),
5273+
);
5274+
let auto_increment_sequence_storage =
5275+
SequenceStorageIdent::new_from(auto_increment_sequence.clone());
5276+
5277+
// assert auto increment sequence exists
5278+
let seqv = mt.get_kv(&auto_increment_sequence.to_string_key()).await?;
5279+
assert!(seqv.is_some() && seqv.unwrap().seq != 0);
5280+
let seqv = mt
5281+
.get_kv(&auto_increment_sequence_storage.to_string_key())
5282+
.await?;
5283+
assert!(seqv.is_some() && seqv.unwrap().seq != 0);
5284+
52575285
// vacuum drop table
52585286
let req = ListDroppedTableReq::new(&tenant);
52595287
let resp = mt.get_drop_table_infos(req).await?;
@@ -5274,6 +5302,14 @@ impl SchemaApiTestSuite {
52745302
};
52755303
let seqv = mt.get_kv(&table_key.to_string_key()).await?;
52765304
assert!(seqv.is_none());
5305+
5306+
// assert auto increment sequence has been vacuum
5307+
let seqv = mt.get_kv(&auto_increment_sequence.to_string_key()).await?;
5308+
assert!(seqv.is_none());
5309+
let seqv = mt
5310+
.get_kv(&auto_increment_sequence_storage.to_string_key())
5311+
.await?;
5312+
assert!(seqv.is_none());
52775313
}
52785314

52795315
Ok(())

src/query/ee/src/storages/fuse/operations/handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,11 @@ impl VacuumHandler for RealVacuumHandler {
5252

5353
async fn do_vacuum_drop_tables(
5454
&self,
55-
ctx: Arc<dyn TableContext>,
5655
threads_nums: usize,
5756
tables: Vec<Arc<dyn Table>>,
5857
dry_run_limit: Option<usize>,
5958
) -> VacuumDropTablesResult {
60-
vacuum_drop_tables(ctx, threads_nums, tables, dry_run_limit).await
59+
vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
6160
}
6261

6362
async fn do_vacuum_temporary_files(

src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,41 +16,28 @@ use std::collections::HashSet;
1616
use std::sync::Arc;
1717
use std::time::Instant;
1818

19-
use databend_common_ast::ast::AutoIncrement;
2019
use databend_common_base::runtime::execute_futures_in_parallel;
2120
use databend_common_catalog::table::Table;
22-
use databend_common_catalog::table_context::TableContext;
2321
use databend_common_exception::Result;
24-
use databend_common_meta_app::schema::DropSequenceReq;
25-
use databend_common_meta_app::schema::SequenceIdent;
2622
use databend_common_meta_app::schema::TableInfo;
2723
use databend_common_storages_fuse::FuseTable;
2824
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo;
2925
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult;
30-
use databend_query::interpreters::DropSequenceInterpreter;
3126
use futures_util::TryStreamExt;
3227
use log::error;
3328
use log::info;
3429
use opendal::EntryMode;
3530
use opendal::Operator;
36-
3731
#[async_backtrace::framed]
3832
pub async fn do_vacuum_drop_table(
39-
ctx: Arc<dyn TableContext>,
4033
tables: Vec<(TableInfo, Operator)>,
4134
dry_run_limit: Option<usize>,
4235
) -> VacuumDropTablesResult {
4336
let mut list_files = vec![];
4437
let mut failed_tables = HashSet::new();
4538
for (table_info, operator) in tables {
46-
let result = vacuum_drop_single_table(
47-
ctx.clone(),
48-
&table_info,
49-
operator,
50-
dry_run_limit,
51-
&mut list_files,
52-
)
53-
.await;
39+
let result =
40+
vacuum_drop_single_table(&table_info, operator, dry_run_limit, &mut list_files).await;
5441
if result.is_err() {
5542
let table_id = table_info.ident.table_id;
5643
failed_tables.insert(table_id);
@@ -64,7 +51,6 @@ pub async fn do_vacuum_drop_table(
6451
}
6552

6653
async fn vacuum_drop_single_table(
67-
ctx: Arc<dyn TableContext>,
6854
table_info: &TableInfo,
6955
operator: Operator,
7056
dry_run_limit: Option<usize>,
@@ -91,25 +77,6 @@ async fn vacuum_drop_single_table(
9177
error!("failed to remove all in directory {}: {}", dir, err);
9278
}
9379
result?;
94-
95-
// clear the sequence associated with auto increment in the table field
96-
for table_field in table_info.meta.schema.fields() {
97-
if table_field.auto_increment_display().is_none() {
98-
continue;
99-
}
100-
let sequence_name =
101-
AutoIncrement::sequence_name(table_info.ident.table_id, table_field.column_id);
102-
103-
DropSequenceInterpreter::req_execute(
104-
ctx.as_ref(),
105-
DropSequenceReq {
106-
if_exists: false,
107-
ident: SequenceIdent::new(ctx.get_tenant(), sequence_name),
108-
},
109-
true,
110-
)
111-
.await?;
112-
}
11380
}
11481
Some(dry_run_limit) => {
11582
let mut ds = operator.lister_with(&dir).recursive(true).await?;
@@ -160,7 +127,6 @@ async fn vacuum_drop_single_table(
160127

161128
#[async_backtrace::framed]
162129
pub async fn vacuum_drop_tables_by_table_info(
163-
ctx: Arc<dyn TableContext>,
164130
num_threads: usize,
165131
table_infos: Vec<(TableInfo, Operator)>,
166132
dry_run_limit: Option<usize>,
@@ -182,15 +148,15 @@ pub async fn vacuum_drop_tables_by_table_info(
182148
);
183149

184150
let result = if batch_size >= table_infos.len() {
185-
do_vacuum_drop_table(ctx, table_infos, dry_run_limit).await?
151+
do_vacuum_drop_table(table_infos, dry_run_limit).await?
186152
} else {
187153
let mut chunks = table_infos.chunks(batch_size);
188154
let dry_run_limit = dry_run_limit
189155
.map(|dry_run_limit| (dry_run_limit / num_threads).min(dry_run_limit).max(1));
190156
let tasks = std::iter::from_fn(move || {
191157
chunks
192158
.next()
193-
.map(|tables| do_vacuum_drop_table(ctx.clone(), tables.to_vec(), dry_run_limit))
159+
.map(|tables| do_vacuum_drop_table(tables.to_vec(), dry_run_limit))
194160
});
195161

196162
let result = execute_futures_in_parallel(
@@ -234,7 +200,6 @@ pub async fn vacuum_drop_tables_by_table_info(
234200

235201
#[async_backtrace::framed]
236202
pub async fn vacuum_drop_tables(
237-
ctx: Arc<dyn TableContext>,
238203
threads_nums: usize,
239204
tables: Vec<Arc<dyn Table>>,
240205
dry_run_limit: Option<usize>,
@@ -259,5 +224,5 @@ pub async fn vacuum_drop_tables(
259224
table_infos.push((table_info.clone(), operator));
260225
}
261226

262-
vacuum_drop_tables_by_table_info(ctx, threads_nums, table_infos, dry_run_limit).await
227+
vacuum_drop_tables_by_table_info(threads_nums, table_infos, dry_run_limit).await
263228
}

src/query/ee_features/vacuum_handler/src/vacuum_handler.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ pub trait VacuumHandler: Sync + Send {
4545

4646
async fn do_vacuum_drop_tables(
4747
&self,
48-
ctx: Arc<dyn TableContext>,
4948
threads_nums: usize,
5049
tables: Vec<Arc<dyn Table>>,
5150
dry_run_limit: Option<usize>,
@@ -101,13 +100,12 @@ impl VacuumHandlerWrapper {
101100
#[async_backtrace::framed]
102101
pub async fn do_vacuum_drop_tables(
103102
&self,
104-
ctx: Arc<dyn TableContext>,
105103
threads_nums: usize,
106104
tables: Vec<Arc<dyn Table>>,
107105
dry_run_limit: Option<usize>,
108106
) -> VacuumDropTablesResult {
109107
self.handler
110-
.do_vacuum_drop_tables(ctx, threads_nums, tables, dry_run_limit)
108+
.do_vacuum_drop_tables(threads_nums, tables, dry_run_limit)
111109
.await
112110
}
113111

src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ impl Interpreter for VacuumDropTablesInterpreter {
190190
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
191191
let (files_opt, failed_tables) = handler
192192
.do_vacuum_drop_tables(
193-
self.ctx.clone(),
194193
threads_nums,
195194
tables,
196195
if self.plan.option.dry_run.is_some() {

0 commit comments

Comments
 (0)