Skip to content

Commit d80a219

Browse files
authored
fix: use last triggered time for derived stream period to prevent missing data (#4253)
Fixes #4241 - Adds a new `data` column to `scheduled_jobs` table for the purpose of storing different data as required by different modules. - Uses the last end time of stream as the start time of derived stream calculation. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new `data` field to the `Trigger` structure, allowing for additional information storage in scheduled jobs. - Introduced a new `DerivedTriggerData` struct for improved data handling within the scheduler module. - Enhanced job evaluation and query conditions with new optional `start_time` parameters for greater flexibility. - **Bug Fixes** - Improved error handling for database schema changes to maintain backward compatibility with older versions. - **Refactor** - Simplified logic within various functions to improve code clarity and performance. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 68e2c7c commit d80a219

File tree

9 files changed

+150
-33
lines changed

9 files changed

+150
-33
lines changed

src/infra/src/scheduler/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ pub struct Trigger {
116116
#[serde(skip_serializing_if = "Option::is_none")]
117117
pub end_time: Option<i64>,
118118
pub retries: i32,
119+
pub data: String,
119120
}
120121

121122
/// Initializes the scheduler - creates table and index

src/infra/src/scheduler/mysql.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,21 @@ CREATE TABLE IF NOT EXISTS scheduled_jobs
5858
end_time BIGINT,
5959
retries INT not null,
6060
next_run_at BIGINT not null,
61-
created_at TIMESTAMP default CURRENT_TIMESTAMP
61+
created_at TIMESTAMP default CURRENT_TIMESTAMP,
62+
data LONGTEXT not null
6263
);
6364
"#,
6465
)
6566
.execute(&pool)
6667
.await?;
68+
69+
// create data column for old version <= 0.10.9
70+
let has_data_column = sqlx::query_scalar::<_,i64>("SELECT count(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='scheduled_jobs' AND column_name='data';")
71+
.fetch_one(&pool)
72+
.await?;
73+
if has_data_column == 0 {
74+
add_data_column().await?;
75+
}
6776
Ok(())
6877
}
6978

@@ -119,8 +128,8 @@ SELECT CAST(COUNT(*) AS SIGNED) AS num FROM scheduled_jobs WHERE module = ?;"#,
119128

120129
if let Err(e) = sqlx::query(
121130
r#"
122-
INSERT IGNORE INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, status, retries, next_run_at, start_time, end_time)
123-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? , ?);
131+
INSERT IGNORE INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, status, retries, next_run_at, start_time, end_time, data)
132+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ? , ?, ?);
124133
"#,
125134
)
126135
.bind(&trigger.org)
@@ -133,6 +142,7 @@ INSERT IGNORE INTO scheduled_jobs (org, module, module_key, is_realtime, is_sile
133142
.bind(trigger.next_run_at)
134143
.bind(0)
135144
.bind(0)
145+
.bind(&trigger.data)
136146
.execute(&mut *tx)
137147
.await
138148
{
@@ -215,14 +225,15 @@ INSERT IGNORE INTO scheduled_jobs (org, module, module_key, is_realtime, is_sile
215225
let pool = CLIENT.clone();
216226
sqlx::query(
217227
r#"UPDATE scheduled_jobs
218-
SET status = ?, retries = ?, next_run_at = ?, is_realtime = ?, is_silenced = ?
228+
SET status = ?, retries = ?, next_run_at = ?, is_realtime = ?, is_silenced = ?, data = ?
219229
WHERE org = ? AND module_key = ? AND module = ?;"#,
220230
)
221231
.bind(trigger.status)
222232
.bind(trigger.retries)
223233
.bind(trigger.next_run_at)
224234
.bind(trigger.is_realtime)
225235
.bind(trigger.is_silenced)
236+
.bind(&trigger.data)
226237
.bind(&trigger.org)
227238
.bind(&trigger.module_key)
228239
.bind(&trigger.module)
@@ -465,3 +476,20 @@ SELECT CAST(COUNT(*) AS SIGNED) AS num FROM scheduled_jobs;"#,
465476
Ok(())
466477
}
467478
}
479+
480+
async fn add_data_column() -> Result<()> {
481+
log::info!("[MYSQL] Adding data column to scheduled_jobs table");
482+
let pool = CLIENT.clone();
483+
if let Err(e) =
484+
sqlx::query(r#"ALTER TABLE scheduled_jobs ADD COLUMN data TEXT NOT NULL DEFAULT ('');"#)
485+
.execute(&pool)
486+
.await
487+
{
488+
if !e.to_string().contains("Duplicate column name") {
489+
// Check for the specific MySQL error code for duplicate column
490+
log::error!("[MYSQL] Unexpected error in adding column: {}", e);
491+
return Err(e.into());
492+
}
493+
}
494+
Ok(())
495+
}

src/infra/src/scheduler/postgres.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,21 @@ CREATE TABLE IF NOT EXISTS scheduled_jobs
5858
end_time BIGINT,
5959
retries INT not null,
6060
next_run_at BIGINT not null,
61-
created_at TIMESTAMP default CURRENT_TIMESTAMP
61+
created_at TIMESTAMP default CURRENT_TIMESTAMP,
62+
data TEXT not null
6263
);
6364
"#,
6465
)
6566
.execute(&pool)
6667
.await?;
68+
69+
// create start_dt column for old version <= 0.9.2
70+
let has_data_column = sqlx::query_scalar::<_,i64>("SELECT count(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='scheduled_jobs' AND column_name='data';")
71+
.fetch_one(&pool)
72+
.await?;
73+
if has_data_column == 0 {
74+
add_data_column().await?;
75+
}
6776
Ok(())
6877
}
6978

@@ -116,8 +125,8 @@ SELECT COUNT(*)::BIGINT AS num FROM scheduled_jobs WHERE module = $1;"#,
116125

117126
if let Err(e) = sqlx::query(
118127
r#"
119-
INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, status, retries, next_run_at, start_time, end_time)
120-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
128+
INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, status, retries, next_run_at, start_time, end_time, data)
129+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
121130
ON CONFLICT DO NOTHING;
122131
"#,
123132
)
@@ -131,6 +140,7 @@ INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, s
131140
.bind(trigger.next_run_at)
132141
.bind(0)
133142
.bind(0)
143+
.bind(&trigger.data)
134144
.execute(&mut *tx)
135145
.await
136146
{
@@ -213,14 +223,15 @@ INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, s
213223
let pool = CLIENT.clone();
214224
sqlx::query(
215225
r#"UPDATE scheduled_jobs
216-
SET status = $1, retries = $2, next_run_at = $3, is_realtime = $4, is_silenced = $5
217-
WHERE org = $6 AND module_key = $7 AND module = $8;"#,
226+
SET status = $1, retries = $2, next_run_at = $3, is_realtime = $4, is_silenced = $5, data = $6
227+
WHERE org = $7 AND module_key = $8 AND module = $9;"#,
218228
)
219229
.bind(trigger.status)
220230
.bind(trigger.retries)
221231
.bind(trigger.next_run_at)
222232
.bind(trigger.is_realtime)
223233
.bind(trigger.is_silenced)
234+
.bind(&trigger.data)
224235
.bind(&trigger.org)
225236
.bind(&trigger.module_key)
226237
.bind(&trigger.module)
@@ -427,3 +438,18 @@ SELECT COUNT(*)::BIGINT AS num FROM scheduled_jobs;"#,
427438
Ok(())
428439
}
429440
}
441+
442+
async fn add_data_column() -> Result<()> {
443+
log::info!("[POSTGRES] Adding data column to scheduled_jobs table");
444+
let pool = CLIENT.clone();
445+
if let Err(e) = sqlx::query(
446+
r#"ALTER TABLE scheduled_jobs ADD COLUMN IF NOT EXISTS data TEXT NOT NULL DEFAULT '';"#,
447+
)
448+
.execute(&pool)
449+
.await
450+
{
451+
log::error!("[POSTGRES] Error in adding column data: {}", e);
452+
return Err(e.into());
453+
}
454+
Ok(())
455+
}

src/infra/src/scheduler/sqlite.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use async_trait::async_trait;
1717
use chrono::Duration;
1818
use config::utils::json;
19-
use sqlx::Row;
19+
use sqlx::{Pool, Row, Sqlite};
2020

2121
use super::{Trigger, TriggerModule, TriggerStatus, TRIGGERS_KEY};
2222
use crate::{
@@ -62,12 +62,16 @@ CREATE TABLE IF NOT EXISTS scheduled_jobs
6262
end_time BIGINT,
6363
retries INT not null,
6464
next_run_at BIGINT not null,
65-
created_at TIMESTAMP default CURRENT_TIMESTAMP
65+
created_at TIMESTAMP default CURRENT_TIMESTAMP,
66+
data TEXT not null
6667
);
6768
"#,
6869
)
6970
.execute(&*client)
7071
.await?;
72+
73+
// Add data column for old version <= 0.10.9
74+
add_data_column(&client).await?;
7175
Ok(())
7276
}
7377

@@ -117,8 +121,8 @@ SELECT COUNT(*) as num FROM scheduled_jobs WHERE module = $1;"#,
117121

118122
if let Err(e) = sqlx::query(
119123
r#"
120-
INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, status, retries, next_run_at, start_time, end_time)
121-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
124+
INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, status, retries, next_run_at, start_time, end_time, data)
125+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
122126
ON CONFLICT DO NOTHING;
123127
"#,
124128
)
@@ -132,6 +136,7 @@ INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, s
132136
.bind(trigger.next_run_at)
133137
.bind(0)
134138
.bind(0)
139+
.bind(&trigger.data)
135140
.execute(&mut *tx)
136141
.await
137142
{
@@ -229,14 +234,15 @@ INSERT INTO scheduled_jobs (org, module, module_key, is_realtime, is_silenced, s
229234
let client = client.lock().await;
230235
sqlx::query(
231236
r#"UPDATE scheduled_jobs
232-
SET status = $1, retries = $2, next_run_at = $3, is_realtime = $4, is_silenced = $5
233-
WHERE org = $6 AND module_key = $7 AND module = $8;"#,
237+
SET status = $1, retries = $2, next_run_at = $3, is_realtime = $4, is_silenced = $5, data = $6
238+
WHERE org = $7 AND module_key = $8 AND module = $9;"#,
234239
)
235240
.bind(&trigger.status)
236241
.bind(trigger.retries)
237242
.bind(trigger.next_run_at)
238243
.bind(trigger.is_realtime)
239244
.bind(trigger.is_silenced)
245+
.bind(&trigger.data)
240246
.bind(&trigger.org)
241247
.bind(&trigger.module_key)
242248
.bind(&trigger.module)
@@ -436,3 +442,20 @@ SELECT COUNT(*) as num FROM scheduled_jobs;"#,
436442
Ok(())
437443
}
438444
}
445+
446+
async fn add_data_column(client: &Pool<Sqlite>) -> Result<()> {
447+
// Attempt to add the column, ignoring the error if the column already exists
448+
if let Err(e) =
449+
sqlx::query(r#"ALTER TABLE scheduled_jobs ADD COLUMN data TEXT NOT NULL DEFAULT '';"#)
450+
.execute(client)
451+
.await
452+
{
453+
// Check if the error is about the duplicate column
454+
if !e.to_string().contains("duplicate column name") {
455+
// If the error is not about the duplicate column, return it
456+
return Err(e.into());
457+
}
458+
}
459+
460+
Ok(())
461+
}

src/service/alerts/alert.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ impl Alert {
337337
&self.get_stream_params(),
338338
&self.trigger_condition,
339339
&self.query_condition,
340+
None,
340341
)
341342
.await
342343
}

src/service/alerts/derived_streams.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
use std::str::FromStr;
1717

18-
use chrono::{Duration, Utc};
18+
use chrono::Utc;
1919
use config::{
2020
get_config,
2121
utils::json::{Map, Value},
@@ -120,19 +120,15 @@ pub async fn save(
120120
}
121121

122122
// test derived_stream
123-
if let Err(e) = &derived_stream.evaluate(None).await {
123+
if let Err(e) = &derived_stream.evaluate(None, None).await {
124124
return Err(anyhow::anyhow!(
125125
"DerivedStream not saved due to failed test run caused by {}",
126126
e.to_string()
127127
));
128128
};
129129

130130
// Save the trigger to db
131-
let next_run_at = Utc::now().timestamp_micros()
132-
+ Duration::try_minutes(derived_stream.trigger_condition.frequency)
133-
.unwrap()
134-
.num_microseconds()
135-
.unwrap();
131+
let next_run_at = Utc::now().timestamp_micros();
136132
let trigger = db::scheduler::Trigger {
137133
org: derived_stream.source.org_id.to_string(),
138134
module: db::scheduler::TriggerModule::DerivedStream,
@@ -185,12 +181,18 @@ impl DerivedStreamMeta {
185181
pub async fn evaluate(
186182
&self,
187183
row: Option<&Map<String, Value>>,
184+
start_time: Option<i64>,
188185
) -> Result<(Option<Vec<Map<String, Value>>>, i64), anyhow::Error> {
189186
if self.is_real_time {
190187
self.query_condition.evaluate_realtime(row).await
191188
} else {
192189
self.query_condition
193-
.evaluate_scheduled(&self.source, &self.trigger_condition, &self.query_condition)
190+
.evaluate_scheduled(
191+
&self.source,
192+
&self.trigger_condition,
193+
&self.query_condition,
194+
start_time,
195+
)
194196
.await
195197
}
196198
}

src/service/alerts/mod.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl QueryCondition {
7272
stream_param: &StreamParams,
7373
trigger_condition: &TriggerCondition,
7474
query_condition: &QueryCondition,
75+
start_time: Option<i64>,
7576
) -> Result<(Option<Vec<Map<String, Value>>>, i64), anyhow::Error> {
7677
let now = Utc::now().timestamp_micros();
7778
let sql = match self.query_type {
@@ -98,11 +99,14 @@ impl QueryCondition {
9899
if v.is_empty() {
99100
return Ok((None, now));
100101
}
101-
let start = now
102-
- Duration::try_minutes(trigger_condition.period)
102+
let start = if let Some(start_time) = start_time {
103+
start_time
104+
} else {
105+
now - Duration::try_minutes(trigger_condition.period)
103106
.unwrap()
104107
.num_microseconds()
105-
.unwrap();
108+
.unwrap()
109+
};
106110
let end = now;
107111
let condition = self.promql_condition.as_ref().unwrap();
108112
let req = promql::MetricsQueryRequest {
@@ -182,11 +186,14 @@ impl QueryCondition {
182186
} else {
183187
std::cmp::max(100, trigger_condition.threshold)
184188
},
185-
start_time: now
186-
- Duration::try_minutes(trigger_condition.period)
189+
start_time: if let Some(start_time) = start_time {
190+
start_time
191+
} else {
192+
now - Duration::try_minutes(trigger_condition.period)
187193
.unwrap()
188194
.num_microseconds()
189-
.unwrap(),
195+
.unwrap()
196+
},
190197
end_time: now,
191198
sort_by: None,
192199
quick_mode: false,

0 commit comments

Comments
 (0)