Skip to content

Commit d82ac3b

Browse files
oasiskhengfeiyang
andauthored
added more unit tests (#145)
Co-authored-by: Hengfei Yang <hengfei.yang@gmail.com>
1 parent 66457a2 commit d82ac3b

File tree

16 files changed

+415
-16
lines changed

16 files changed

+415
-16
lines changed

src/common/http.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,30 @@ pub fn get_stream_type_from_request(
3838

3939
Ok(stream_type)
4040
}
41+
42+
#[cfg(test)]
43+
mod tests {
44+
use super::*;
45+
#[test]
46+
fn test_get_file_from_cache() {
47+
let key = "type".to_string();
48+
49+
let mut map: HashMap<String, String> = HashMap::new();
50+
map.insert(key.clone(), key.clone());
51+
52+
let resp = get_stream_type_from_request(&Query { 0: map.clone() });
53+
assert!(resp.is_err());
54+
55+
map.insert(key.clone(), "LOGS".to_string());
56+
let resp = get_stream_type_from_request(&Query { 0: map.clone() });
57+
assert_eq!(resp.unwrap(), Some(StreamType::Logs));
58+
59+
map.insert(key.clone(), "METRICS".to_string());
60+
let resp = get_stream_type_from_request(&Query { 0: map.clone() });
61+
assert_eq!(resp.unwrap(), Some(StreamType::Metrics));
62+
63+
map.insert(key.clone(), "TRACES".to_string());
64+
let resp = get_stream_type_from_request(&Query { 0: map.clone() });
65+
assert_eq!(resp.unwrap(), Some(StreamType::Traces));
66+
}
67+
}

src/handler/grpc/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,23 @@ impl From<&meta::common::FileKey> for cluster_rpc::FileKey {
9393
}
9494
}
9595
}
96+
97+
#[cfg(test)]
98+
mod test {
99+
use super::*;
100+
#[actix_web::test]
101+
async fn test_get_file_meta() {
102+
let file_meta = meta::common::FileMeta {
103+
min_ts: 1667978841110,
104+
max_ts: 1667978845354,
105+
records: 300,
106+
original_size: 10,
107+
compressed_size: 1,
108+
};
109+
110+
let rpc_meta = cluster_rpc::FileMeta::from(&file_meta);
111+
let resp = meta::common::FileMeta::from(&rpc_meta);
112+
113+
assert_eq!(file_meta, resp);
114+
}
115+
}

src/infra/cache/file_data.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,32 @@ pub async fn download(file: &str) -> Result<Bytes, anyhow::Error> {
136136
};
137137
Ok(data)
138138
}
139+
140+
#[cfg(test)]
141+
mod tests {
142+
use super::*;
143+
#[test]
144+
fn test_get_file_from_cache() {
145+
let mut file_data = FileData::default();
146+
let file_key = "files/default/logs/olympics/2022/10/03/10/6982652937134804993_1.parquet";
147+
let content = Bytes::from("Some text");
148+
149+
let resp = file_data.set(file_key, content.clone());
150+
assert!(resp.is_ok());
151+
152+
let resp = file_data.get(file_key);
153+
assert_eq!(resp.unwrap(), content);
154+
155+
let resp = set(file_key, content.clone());
156+
assert!(resp.is_ok());
157+
158+
let resp = exist(file_key);
159+
assert_eq!(resp.unwrap(), true);
160+
161+
let resp = get(file_key);
162+
assert_eq!(resp.unwrap(), content);
163+
164+
let resp = stats();
165+
assert!(resp.0 > 0);
166+
}
167+
}

src/infra/cache/stats.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,36 @@ mod tests {
111111
let stats = get_stats();
112112
let data = get_stream_stats_len();
113113
assert_eq!(data, stats.len());
114+
115+
let val = StreamStats {
116+
doc_time_min: 1667978841102,
117+
doc_time_max: 1667978845374,
118+
doc_num: 5000,
119+
file_num: 1,
120+
storage_size: 200.00,
121+
compressed_size: 3.00,
122+
};
123+
124+
let _ = set_stream_stats("nexus", "default", "logs", val);
125+
let stats = get_stream_stats("nexus", "default", "logs");
126+
assert_eq!(stats, Some(val));
127+
128+
let file_meta = FileMeta {
129+
min_ts: 1667978841110,
130+
max_ts: 1667978845354,
131+
records: 300,
132+
original_size: 10,
133+
compressed_size: 1,
134+
};
135+
136+
let file_key = "files/nexus/logs/default/2022/10/03/10/6982652937134804993_1.parquet";
137+
let _ = incr_stream_stats(file_key, file_meta);
138+
139+
let stats = get_stream_stats("nexus", "default", "logs");
140+
assert_eq!(stats.unwrap().doc_num, 5300);
141+
142+
let _ = decr_stream_stats(file_key, file_meta);
143+
let stats = get_stream_stats("nexus", "default", "logs");
144+
assert_eq!(stats.unwrap().doc_num, 5000);
114145
}
115146
}

src/infra/db/mod.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,24 +73,21 @@ mod tests {
7373
use super::default;
7474
use bytes::Bytes;
7575

76-
#[tokio::test]
77-
#[ignore]
76+
#[actix_web::test]
7877
async fn test_get() {
7978
let db = default();
8079
db.put("/foo/bar", Bytes::from("hello")).await.unwrap();
8180
let value = db.get("/foo/bar").await.unwrap();
8281
assert_eq!(value, Bytes::from("hello"));
8382
}
8483

85-
#[tokio::test]
86-
#[ignore]
84+
#[actix_web::test]
8785
async fn test_put() {
8886
let db = default();
8987
assert_eq!(true, db.put("/foo/bar", Bytes::from("hello")).await.is_ok());
9088
}
9189

92-
#[tokio::test]
93-
#[ignore]
90+
#[actix_web::test]
9491
async fn test_delete() {
9592
let db = default();
9693
assert_eq!(

src/service/alert_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::meta::search::Request;
2525
use crate::service::search as SearchService;
2626
use crate::service::triggers;
2727

28+
#[cfg_attr(coverage_nightly, no_coverage)]
2829
pub async fn run() -> Result<(), anyhow::Error> {
2930
for item in TRIGGERS.iter() {
3031
if !item.is_ingest_time {
@@ -36,6 +37,8 @@ pub async fn run() -> Result<(), anyhow::Error> {
3637
}
3738
Ok(())
3839
}
40+
41+
#[cfg_attr(coverage_nightly, no_coverage)]
3942
pub async fn handle_triggers(alert_name: &str, trigger: Trigger) {
4043
match super::db::alerts::get(&trigger.org, &trigger.stream, &trigger.alert_name).await {
4144
Ok(result) => {
@@ -71,6 +74,7 @@ pub async fn handle_triggers(alert_name: &str, trigger: Trigger) {
7174
}
7275
}
7376

77+
#[cfg_attr(coverage_nightly, no_coverage)]
7478
pub async fn handle_trigger(alert_name: &str, alert: Alert) {
7579
let mut interval = time::interval(time::Duration::from_secs(
7680
(alert.frequency * 60).try_into().unwrap(),

src/service/db/compact/file_list.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,18 @@ pub async fn set_offset(offset: i64) -> Result<(), anyhow::Error> {
2929
db.put(key, offset.to_string().into()).await?;
3030
Ok(())
3131
}
32+
33+
#[cfg(test)]
34+
mod tests {
35+
36+
use super::*;
37+
38+
#[actix_web::test]
39+
async fn test_files() {
40+
let off_set = 100;
41+
42+
let _ = set_offset(off_set).await;
43+
let resp = get_offset().await;
44+
assert_eq!(resp.unwrap(), off_set);
45+
}
46+
}

src/service/db/compact/files.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,21 @@ pub async fn list_offset() -> Result<Vec<(String, i64)>, anyhow::Error> {
5656
}
5757
Ok(items)
5858
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
63+
use super::*;
64+
65+
#[actix_web::test]
66+
async fn test_files() {
67+
let off_set = 100;
68+
69+
let _ = set_offset("nexus", "default", "logs".into(), off_set).await;
70+
let resp = get_offset("nexus", "default", "logs".into()).await;
71+
assert_eq!(resp.unwrap(), off_set);
72+
73+
let resp = list_offset().await;
74+
assert!(resp.unwrap().len() > 0);
75+
}
76+
}

src/service/db/file_list/local.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,27 @@ pub async fn broadcast_cache() -> Result<(), anyhow::Error> {
7979
}
8080
super::broadcast::send(&files).await
8181
}
82+
83+
#[cfg(test)]
84+
mod tests {
85+
86+
use super::*;
87+
88+
#[actix_web::test]
89+
async fn test_files() {
90+
let file_key = "files/nexus/logs/default/2022/10/03/10/6982652937134804993_1.parquet";
91+
92+
let file_meta = FileMeta {
93+
min_ts: 1667978841110,
94+
max_ts: 1667978845354,
95+
records: 300,
96+
original_size: 10,
97+
compressed_size: 1,
98+
};
99+
100+
let resp = set(file_key, file_meta, false).await;
101+
//let resp = cache().await;
102+
103+
assert!(resp.is_ok());
104+
}
105+
}

src/service/search/datafusion/exec.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,11 +730,67 @@ async fn register_udf(ctx: &mut SessionContext, _org_id: &str) {
730730

731731
#[cfg(test)]
732732
mod test {
733+
use arrow::array::Int32Array;
734+
use arrow_schema::Field;
735+
use datafusion::from_slice::FromSlice;
736+
733737
use super::*;
734738
#[actix_web::test]
735739
async fn test_register_udf() {
736740
let mut ctx = SessionContext::new();
737741
let _ = register_udf(&mut ctx, "nexus").await;
738742
//assert!(res)
739743
}
744+
745+
#[actix_web::test]
746+
async fn test_merge_write_recordbatch() {
747+
// define a schema.
748+
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int32, false)]));
749+
// define data.
750+
let batch = RecordBatch::try_new(
751+
schema.clone(),
752+
vec![Arc::new(Int32Array::from_slice([1, 10, 10, 100]))],
753+
)
754+
.unwrap();
755+
756+
let batch2 = RecordBatch::try_new(
757+
schema.clone(),
758+
vec![Arc::new(Int32Array::from_slice([2, 20, 20, 200]))],
759+
)
760+
.unwrap();
761+
762+
let res = merge_write_recordbatch(&[vec![batch, batch2]]).unwrap();
763+
764+
assert!(!res.is_empty())
765+
}
766+
767+
#[actix_web::test]
768+
async fn test_merge() {
769+
// define a schema.
770+
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int32, false)]));
771+
// define data.
772+
let batch = RecordBatch::try_new(
773+
schema.clone(),
774+
vec![Arc::new(Int32Array::from_slice([1, 10, 10, 100]))],
775+
)
776+
.unwrap();
777+
778+
let batch2 = RecordBatch::try_new(
779+
schema.clone(),
780+
vec![Arc::new(Int32Array::from_slice([2, 20, 20, 200]))],
781+
)
782+
.unwrap();
783+
784+
let res = merge(
785+
"dummy",
786+
1,
787+
100,
788+
"select * from tbl limit 10",
789+
&vec![vec![batch, batch2]],
790+
)
791+
.await
792+
.unwrap();
793+
794+
assert!(!res.is_empty())
795+
}
740796
}

src/service/search/datafusion/regexp_udf.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,14 @@ mod tests {
261261
let count = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
262262
assert_eq!(count, 2);
263263
}
264+
265+
#[test]
266+
fn parse_escape() {
267+
assert_eq!(is_valid_character_after_escape('0'), true);
268+
assert_eq!(is_valid_character_after_escape('8'), true);
269+
assert_eq!(is_valid_character_after_escape('x'), true);
270+
assert_eq!(is_valid_character_after_escape('p'), true);
271+
assert_eq!(is_valid_character_after_escape('d'), true);
272+
assert_eq!(is_valid_character_after_escape('a'), false);
273+
}
264274
}

src/service/search/datafusion/storage/file_list.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,8 @@ mod tests {
8181

8282
let get_resp = get(session_id).await;
8383
assert!(get_resp.unwrap().len() > 0);
84+
85+
let resp = clear(session_id).await;
86+
assert!(resp.is_ok());
8487
}
8588
}

0 commit comments

Comments
 (0)