Skip to content

Commit d0de874

Browse files
authored
fix: pipeline condition node incorrect evaluation (#5219)
fix #5218 Backend: - [x] allow condition node to evaluate `null` and `""`, empty, for both `=` and `!=` conditions - [x] stop ingestion when resolved dynamic stream name is empty **How to Use(In ConditionNode):** To evaluate field value is not empty: `app_name != ""` To evaluate field value is not null: `app_name != null`
1 parent ac35e43 commit d0de874

File tree

3 files changed

+40
-21
lines changed

3 files changed

+40
-21
lines changed

.github/ISSUE_TEMPLATE/2-feature-request.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: 'Feature Request'
1+
name: "Feature Request"
22
description: Suggest a feature for OpenObserve
33
labels: ["✏️ Feature"]
44

@@ -18,7 +18,7 @@ body:
1818
- ingestion
1919
- alerts
2020
- localization
21-
- routing
21+
- pipeline
2222
- metrics
2323
- traces
2424
- other
@@ -43,4 +43,4 @@ body:
4343
attributes:
4444
label: Alternatives considered
4545
validations:
46-
required: true
46+
required: true

src/config/src/meta/stream.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -822,13 +822,14 @@ impl RoutingCondition {
822822
let val = match row.get(&self.column) {
823823
Some(val) => val,
824824
None => {
825+
// field not found -> dropped
825826
return false;
826827
}
827828
};
828829
match val {
829830
Value::String(v) => {
830831
let val = v.as_str();
831-
let con_val = self.value.as_str().unwrap_or_default();
832+
let con_val = self.value.as_str().unwrap_or_default().trim_matches('"'); // "" is interpreted as empty string
832833
match self.operator {
833834
Operator::EqualTo => val == con_val,
834835
Operator::NotEqualTo => val != con_val,
@@ -878,6 +879,10 @@ impl RoutingCondition {
878879
_ => false,
879880
}
880881
}
882+
Value::Null => {
883+
matches!(self.operator, Operator::EqualTo)
884+
&& matches!(&self.value, Value::String(v) if v == "null")
885+
}
881886
_ => false,
882887
}
883888
}

src/service/pipeline/batch_execution.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -242,20 +242,7 @@ impl ExecutablePipeline {
242242
log::debug!("[Pipeline]: starts result collecting job");
243243
let mut count: usize = 0;
244244
let mut results = HashMap::new();
245-
while let Some((idx, mut stream_params, record)) = result_receiver.recv().await {
246-
if stream_params.stream_name.contains("{") {
247-
match resolve_stream_name(&stream_params.stream_name, &record) {
248-
Ok(stream_name) => {
249-
stream_params.stream_name = stream_name.into();
250-
}
251-
Err(e) => {
252-
log::error!(
253-
"[Pipeline]: dynamic stream name detected in destination, but failed to resolve due to {e}. Record dropped."
254-
);
255-
continue;
256-
}
257-
}
258-
}
245+
while let Some((idx, stream_params, record)) = result_receiver.recv().await {
259246
results
260247
.entry(stream_params)
261248
.or_insert(Vec::new())
@@ -528,9 +515,36 @@ async fn process_node(
528515
};
529516
}
530517

531-
if let Err(send_err) = result_sender
532-
.send((idx, stream_params.clone(), record))
533-
.await
518+
let mut destination_stream = stream_params.clone();
519+
if destination_stream.stream_name.contains("{") {
520+
match resolve_stream_name(&destination_stream.stream_name, &record) {
521+
Ok(stream_name) if !stream_name.is_empty() => {
522+
destination_stream.stream_name = stream_name.into();
523+
}
524+
resolve_res => {
525+
let err_msg = if let Err(e) = resolve_res {
526+
format!("Dynamic stream name detected in destination, but failed to resolve due to {e}. Record dropped")
527+
} else {
528+
"Dynamic Stream Name resolved to empty. Record dropped"
529+
.to_string()
530+
};
531+
log::error!("{err_msg}");
532+
if let Err(send_err) = error_sender
533+
.send((node.id.to_string(), node.node_type(), err_msg))
534+
.await
535+
{
536+
log::error!(
537+
"[Pipeline]: LeafNode failed sending errors for collection caused by: {send_err}"
538+
);
539+
break;
540+
}
541+
continue;
542+
}
543+
}
544+
}
545+
546+
if let Err(send_err) =
547+
result_sender.send((idx, destination_stream, record)).await
534548
{
535549
log::error!(
536550
"[Pipeline]: LeafNode errors sending result for collection caused by: {send_err}"

0 commit comments

Comments
 (0)