Skip to content

Commit 921b397

Browse files
authored
fix: enrichment table query (#2603)
1 parent 28311a0 commit 921b397

File tree

9 files changed

+37
-20
lines changed

9 files changed

+37
-20
lines changed

src/common/infra/db/sqlite.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ impl super::Db for SqliteDb {
245245
return Err(e.into());
246246
}
247247

248+
// release lock
249+
drop(client);
250+
248251
// event watch
249252
if need_watch {
250253
if let Err(e) = CHANNEL

src/common/infra/file_list/sqlite.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ INSERT INTO file_list (org, stream, date, file, deleted, min_ts, max_ts, records
9696
return Ok(());
9797
}
9898
let chunks = files.chunks(100);
99-
let client = CLIENT_RW.clone();
100-
let client = client.lock().await;
10199
for files in chunks {
100+
let client = CLIENT_RW.clone();
101+
let client = client.lock().await;
102102
let mut tx = client.begin().await?;
103103
let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
104104
"INSERT INTO file_list (org, stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size)",
@@ -142,6 +142,9 @@ INSERT INTO file_list (org, stream, date, file, deleted, min_ts, max_ts, records
142142
log::error!("[SQLITE] rollback file_list batch add error: {}", e);
143143
return Err(e.into());
144144
}
145+
// release lock
146+
drop(client);
147+
// add file one by one
145148
for item in files {
146149
if let Err(e) = self.add(&item.key, &item.meta).await {
147150
log::error!("[SQLITE] single insert file_list add error: {}", e);
@@ -161,10 +164,10 @@ INSERT INTO file_list (org, stream, date, file, deleted, min_ts, max_ts, records
161164
return Ok(());
162165
}
163166
let chunks = files.chunks(100);
164-
let client = CLIENT_RW.clone();
165-
let client = client.lock().await;
166167
for files in chunks {
167168
// get ids of the files
169+
let client = CLIENT_RW.clone();
170+
let client = client.lock().await;
168171
let pool = client.clone();
169172
let mut ids = Vec::with_capacity(files.len());
170173
for file in files {
@@ -211,9 +214,9 @@ INSERT INTO file_list (org, stream, date, file, deleted, min_ts, max_ts, records
211214
return Ok(());
212215
}
213216
let chunks = files.chunks(100);
214-
let client = CLIENT_RW.clone();
215-
let client = client.lock().await;
216217
for files in chunks {
218+
let client = CLIENT_RW.clone();
219+
let client = client.lock().await;
217220
let mut tx = client.begin().await?;
218221
let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
219222
"INSERT INTO file_list_deleted (org, stream, date, file, created_at)",
@@ -246,10 +249,10 @@ INSERT INTO file_list (org, stream, date, file, deleted, min_ts, max_ts, records
246249
return Ok(());
247250
}
248251
let chunks = files.chunks(100);
249-
let client = CLIENT_RW.clone();
250-
let client = client.lock().await;
251252
for files in chunks {
252253
// get ids of the files
254+
let client = CLIENT_RW.clone();
255+
let client = client.lock().await;
253256
let pool = client.clone();
254257
let mut ids = Vec::with_capacity(files.len());
255258
for file in files {

src/common/meta/search.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ pub struct Response {
176176
#[serde(skip_serializing)]
177177
pub file_count: usize,
178178
pub scan_size: usize,
179+
pub scan_records: usize,
179180
#[serde(default)]
180181
#[serde(skip_serializing_if = "String::is_empty")]
181182
pub response_type: String,
@@ -204,6 +205,7 @@ impl Response {
204205
size,
205206
file_count: 0,
206207
scan_size: 0,
208+
scan_records: 0,
207209
columns: Vec::new(),
208210
hits: Vec::new(),
209211
aggs: HashMap::new(),
@@ -252,6 +254,10 @@ impl Response {
252254
self.scan_size = val;
253255
}
254256

257+
pub fn set_scan_records(&mut self, val: usize) {
258+
self.scan_records = val;
259+
}
260+
255261
pub fn set_session_id(&mut self, session_id: String) {
256262
self.session_id = session_id;
257263
}

src/ingester/src/wal.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ pub(crate) async fn replay_wal_files() -> Result<()> {
119119
.unwrap_or_default();
120120
let key = WriterKey::new(org_id, stream_type);
121121
let mut memtable = memtable::MemTable::new();
122-
let mut reader = wal::Reader::from_path(wal_file).context(WalSnafu)?;
122+
let mut reader = match wal::Reader::from_path(wal_file) {
123+
Ok(v) => v,
124+
Err(e) => {
125+
log::error!("Unable to open the wal file err: {}, skip", e);
126+
continue;
127+
}
128+
};
123129
let mut total = 0;
124130
let mut i = 0;
125131
loop {

src/job/compact.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,8 @@ async fn run_sync_to_db() -> Result<(), anyhow::Error> {
9090
interval.tick().await; // trigger the first run
9191
loop {
9292
interval.tick().await;
93-
let ret = service::db::compact::files::sync_cache_to_db().await;
94-
if ret.is_err() {
95-
log::error!(
96-
"[COMPACTOR] run offset sync cache to db error: {}",
97-
ret.err().unwrap()
98-
);
99-
} else {
100-
log::info!("[COMPACTOR] run offset sync cache to db done");
93+
if let Err(e) = service::db::compact::files::sync_cache_to_db().await {
94+
log::error!("[COMPACTOR] run offset sync cache to db error: {}", e);
10195
}
10296
}
10397
}

src/service/compact/file_list_deleted.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ pub async fn load_prefix_from_s3(
156156
let prefix = format!("file_list_deleted/{prefix}/");
157157
let files = storage::list(&prefix).await?;
158158
let files_num = files.len();
159-
log::info!("Load file_list_deleted gets {} files", files_num);
160159
if files.is_empty() {
161160
return Ok(HashMap::default());
162161
}
162+
log::info!("Load file_list_deleted gets {} files", files_num);
163163

164164
let mut tasks = Vec::with_capacity(CONFIG.limit.query_thread_num + 1);
165165
let chunk_size = std::cmp::max(1, files_num / CONFIG.limit.query_thread_num);

src/service/db/enrichment_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub async fn get(org_id: &str, name: &str) -> Result<Vec<vrl::value::Value>, any
4040
let query = meta::search::Query {
4141
sql: format!("SELECT * FROM \"{name}\" limit {rec_num}"),
4242
start_time: BASE_TIME.timestamp_micros(),
43-
end_time: Utc::now().timestamp(),
43+
end_time: Utc::now().timestamp_micros(),
4444
sql_mode: "full".to_owned(),
4545
..Default::default()
4646
};

src/service/search/grpc/wal.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ pub async fn search_parquet(
9393
let parquet_meta = read_metadata_from_bytes(&file_data)
9494
.await
9595
.unwrap_or_default();
96+
scan_stats.records += parquet_meta.records;
9697
scan_stats.original_size += parquet_meta.original_size;
9798
if let Some((min_ts, max_ts)) = sql.meta.time_range {
9899
if parquet_meta.min_ts <= max_ts && parquet_meta.max_ts >= min_ts {
@@ -334,7 +335,10 @@ pub async fn search_memtable(
334335
let mut batch_groups: HashMap<Arc<Schema>, Vec<RecordBatch>> = HashMap::with_capacity(2);
335336
for (schema, batch) in batches {
336337
let entry = batch_groups.entry(schema).or_default();
337-
scan_stats.original_size += batch.iter().map(|r| r.data_json_size).sum::<usize>() as i64;
338+
for r in batch.iter() {
339+
scan_stats.records += r.data.num_rows() as i64;
340+
scan_stats.original_size += r.data_json_size as i64;
341+
}
338342
entry.extend(batch.into_iter().map(|r| r.data.clone()));
339343
}
340344

src/service/search/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ async fn search_in_cluster(mut req: cluster_rpc::SearchRequest) -> Result<search
587587
result.set_cluster_took(start.elapsed().as_millis() as usize, took_wait);
588588
result.set_file_count(scan_stats.files as usize);
589589
result.set_scan_size(scan_stats.original_size as usize);
590+
result.set_scan_records(scan_stats.records as usize);
590591

591592
if query_type == "table" {
592593
result.response_type = "table".to_string();

0 commit comments

Comments
 (0)