Skip to content

Commit 0ebaf21

Browse files
nikhilsaikethetaiminglbjp232004
authored
feat: Pipeline integration v2 (#4732)
This PR implements the new design of **Pipeline** introduced by #4432 --- **2 types of pipelines differentiates by the source** Type I: **Realtime** - source is a stream (Logs, Metrics, & Traces) - takes effect when the source stream is being ingested Type II: **Scheduled** (aka DerivedSteram) - source is a SQL query - takes effect based on the given schedule The new pipeline is represented as a graph consists of different types of Nodes that are connected by edges. **Pipeline Data Model** ```rs pub struct Pipeline { pub id: String, pub version: i32, pub enabled: bool, pub org: String, pub name: String, pub description: String, pub source: PipelineSource, pub nodes: Vec<Node>, pub edges: Vec<Edge>, } ``` **4 types of node** I. StreamNode: either used for source node of realtime pipeline or for destination nodes II. QueryNode: used for source node of scheduled pipeline III. FunctionNode: used for executing vrl functions IV. ConditionNode: used for checking routing conditions Rules applied to validating a pipeline when it's created: 1. non-empty nodes list 2. non-empty edges list 3. 1st node in nodes list is either StreamNode or QueryNode 4. non-empty `conditions` in all ConditionNode nodes in nodes list 5. all leaf nodes are of type StreamNode 6. In the same branch, unchecked `after_flattened` FunctionNode can't follow checked `after_flattened` checked FunctionNode Pipeline execution is implemented with the struct `ExecutablePipeline`, a ready-execute pipeline object cached in memory based on the Pipeline objects modified on the UI side. `ExecutablePipeline` object - builds the relationships among all the nodes based on the edges - topologically sorts the nodes based on the level to determine which node to process first - registers all the vrl function node once `ExecutablePipeline` object processes ingested records in batch. Starting from the source node, each connecting edge has a channel connecting the parent and child node, used for passing the records. This approach enables batch processing in parallel. --- **Deprecated Features** 1. previous implementation of pipeline 2. Function x Stream associations - Functions can still be added/edited/removed as before - Needs to create a pipeline in order to apply chosen functions to the desired stream The new release when this pr is merged will automatically migrate **old pipelines** and **Function x Stream associations** to new pipeline format and can be found in `Stream Pipelines` tab with `Migrated-` prefix names. **_Note: Auto generated pipelines are paused by default. Please verify the pipelines before enabling them_** Co-authored-by: Taiming Liu <liutaiming3@gmail.com> Co-authored-by: Bhargav <BJP232004@GMAIL.COM>
1 parent 61ce5a0 commit 0ebaf21

File tree

108 files changed

+10572
-4387
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+10572
-4387
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cli/basic/cli.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ pub async fn cli() -> Result<bool, anyhow::Error> {
9999
.help("migrate to: sqlite, etcd, mysql, postgresql"),
100100
]),
101101
clap::Command::new("migrate-dashboards").about("migrate-dashboards"),
102+
clap::Command::new("migrate-pipeline").about("migrate pipelines")
103+
.arg(
104+
clap::Arg::new("drop-table")
105+
.long("drop-table")
106+
.required(false)
107+
.num_args(0)
108+
.help("Drop existing Pipeline table first before migrating")
109+
),
102110
clap::Command::new("delete-parquet")
103111
.about("delete parquet files from s3 and file_list")
104112
.arg(
@@ -246,6 +254,11 @@ pub async fn cli() -> Result<bool, anyhow::Error> {
246254
println!("Running migration dashboard");
247255
migration::dashboards::run().await?
248256
}
257+
"migrate-pipeline" => {
258+
println!("Running migration pipeline");
259+
let drop_table = command.get_flag("drop-table");
260+
migration::pipeline_func::run(drop_table).await?;
261+
}
249262
"delete-parquet" => {
250263
let file = command.get_one::<String>("file").unwrap();
251264
match file_list::delete_parquet_file(file, true).await {

src/common/infra/config.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,14 @@
1515

1616
use std::sync::Arc;
1717

18-
use config::{RwAHashMap, RwHashMap};
18+
use config::{
19+
meta::{
20+
alerts::{alert::Alert, destinations::Destination, templates::Template},
21+
function::Transform,
22+
stream::StreamParams,
23+
},
24+
RwAHashMap, RwHashMap,
25+
};
1926
use dashmap::DashMap;
2027
use hashbrown::HashMap;
2128
use infra::table::short_urls::ShortUrlRecord;
@@ -25,18 +32,12 @@ use vector_enrichment::TableRegistry;
2532

2633
use crate::{
2734
common::meta::{
28-
alerts::{alert::Alert, destinations::Destination, templates::Template},
29-
dashboards::reports,
30-
functions::{StreamFunctionsList, Transform},
31-
maxmind::MaxmindClient,
32-
organization::OrganizationSetting,
33-
pipelines::PipeLine,
34-
prom::ClusterLeader,
35-
syslog::SyslogRoute,
36-
user::User,
35+
dashboards::reports, maxmind::MaxmindClient, organization::OrganizationSetting,
36+
prom::ClusterLeader, syslog::SyslogRoute, user::User,
3737
},
3838
service::{
3939
db::scheduler as db_scheduler, enrichment::StreamTable, enrichment_table::geoip::Geoip,
40+
pipeline::batch_execution::ExecutablePipeline,
4041
},
4142
};
4243

@@ -47,8 +48,6 @@ pub static BUILD_DATE: &str = env!("GIT_BUILD_DATE");
4748

4849
// global cache variables
4950
pub static KVS: Lazy<RwHashMap<String, bytes::Bytes>> = Lazy::new(Default::default);
50-
pub static STREAM_FUNCTIONS: Lazy<RwHashMap<String, StreamFunctionsList>> =
51-
Lazy::new(DashMap::default);
5251
pub static QUERY_FUNCTIONS: Lazy<RwHashMap<String, Transform>> = Lazy::new(DashMap::default);
5352
pub static USERS: Lazy<RwHashMap<String, User>> = Lazy::new(DashMap::default);
5453
pub static USERS_RUM_TOKEN: Lazy<Arc<RwHashMap<String, User>>> =
@@ -83,6 +82,7 @@ pub static GEOIP_CITY_TABLE: Lazy<Arc<RwLock<Option<Geoip>>>> =
8382
pub static GEOIP_ASN_TABLE: Lazy<Arc<RwLock<Option<Geoip>>>> =
8483
Lazy::new(|| Arc::new(RwLock::new(None)));
8584

85+
pub static STREAM_EXECUTABLE_PIPELINES: Lazy<RwAHashMap<StreamParams, ExecutablePipeline>> =
86+
Lazy::new(Default::default);
8687
pub static USER_SESSIONS: Lazy<RwHashMap<String, String>> = Lazy::new(Default::default);
87-
pub static STREAM_PIPELINES: Lazy<RwHashMap<String, PipeLine>> = Lazy::new(DashMap::default);
8888
pub static SHORT_URLS: Lazy<RwHashMap<String, ShortUrlRecord>> = Lazy::new(DashMap::default);

src/common/meta/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
// You should have received a copy of the GNU Affero General Public License
1414
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

16-
pub mod alerts;
1716
pub mod authz;
1817
pub mod dashboards;
19-
pub mod functions;
2018
pub mod http;
2119
pub mod ingestion;
2220
pub mod maxmind;

src/common/meta/organization.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
// You should have received a copy of the GNU Affero General Public License
1414
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

16+
use config::meta::{alerts::alert::Alert, function::Transform};
1617
use serde::{Deserialize, Serialize};
1718
use utoipa::ToSchema;
1819

19-
use super::{alerts::alert::Alert, functions::Transform};
20-
2120
pub const DEFAULT_ORG: &str = "default";
2221
pub const CUSTOM: &str = "custom";
2322
pub const THRESHOLD: i64 = 9383939382;

src/common/meta/pipelines.rs

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616
use std::collections::HashMap;
1717

1818
use config::{
19-
meta::stream::{RoutingCondition, StreamType},
19+
meta::{
20+
alerts::derived_streams::DerivedStreamMeta,
21+
stream::{RoutingCondition, StreamType},
22+
},
2023
utils::json::Value,
2124
};
2225
use serde::{Deserialize, Serialize};
2326
use utoipa::ToSchema;
2427

25-
use crate::common::meta::{
26-
alerts::derived_streams::DerivedStreamMeta, functions::StreamFunctionsList,
27-
};
28-
2928
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema, PartialEq)]
3029
pub struct PipeLine {
3130
pub name: String,
@@ -42,42 +41,3 @@ pub struct PipeLine {
4241
#[serde(skip_serializing_if = "Option::is_none")]
4342
pub meta: Option<HashMap<String, Value>>,
4443
}
45-
46-
impl PipeLine {
47-
pub fn into_response(self, functions: Option<StreamFunctionsList>) -> PipeLineResponse {
48-
PipeLineResponse {
49-
name: self.name,
50-
description: self.description,
51-
stream_name: self.stream_name,
52-
stream_type: self.stream_type,
53-
routing: self.routing,
54-
derived_streams: self.derived_streams,
55-
functions,
56-
meta: self.meta,
57-
}
58-
}
59-
}
60-
61-
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema, PartialEq)]
62-
pub struct PipeLineResponse {
63-
pub name: String,
64-
#[serde(default)]
65-
pub description: String,
66-
#[serde(default)]
67-
pub stream_name: String,
68-
#[serde(default)]
69-
pub stream_type: StreamType,
70-
#[serde(skip_serializing_if = "Option::is_none")]
71-
pub routing: Option<HashMap<String, Vec<RoutingCondition>>>,
72-
#[serde(skip_serializing_if = "Option::is_none")]
73-
pub functions: Option<StreamFunctionsList>,
74-
#[serde(skip_serializing_if = "Option::is_none")]
75-
pub derived_streams: Option<Vec<DerivedStreamMeta>>,
76-
#[serde(skip_serializing_if = "Option::is_none")]
77-
pub meta: Option<HashMap<String, Value>>,
78-
}
79-
80-
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
81-
pub struct PipeLineList {
82-
pub list: Vec<PipeLineResponse>,
83-
}

src/common/meta/telemetry.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,6 @@ pub async fn add_zo_info(mut data: HashMap<String, json::Value>) -> HashMap<Stri
251251
format!("{:.0}", (traces_compressed_size / SIZE_IN_MB)).into(),
252252
);
253253

254-
let iter = STREAM_FUNCTIONS.iter().clone();
255-
let mut ingest_functions = 0;
256-
for item in iter {
257-
ingest_functions += item.value().list.len()
258-
}
259-
data.insert("num_ingest_functions".to_string(), ingest_functions.into());
260254
data.insert(
261255
"num_query_functions".to_string(),
262256
QUERY_FUNCTIONS.len().into(),

src/common/migration/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use version_compare::Version;
2020
pub mod dashboards;
2121
pub mod file_list;
2222
pub mod meta;
23+
pub mod pipeline_func;
2324
pub mod schema;
2425

2526
pub async fn check_upgrade(old_ver: &str, new_ver: &str) -> Result<(), anyhow::Error> {
@@ -46,6 +47,11 @@ pub async fn check_upgrade(old_ver: &str, new_ver: &str) -> Result<(), anyhow::E
4647
upgrade_092_093().await?;
4748
}
4849

50+
let v131 = Version::from("v0.13.1").unwrap();
51+
if old_ver < v131 {
52+
upgrade_130_131().await?;
53+
}
54+
4955
Ok(())
5056
}
5157

@@ -66,6 +72,13 @@ async fn upgrade_092_093() -> Result<(), anyhow::Error> {
6672
Ok(())
6773
}
6874

75+
async fn upgrade_130_131() -> Result<(), anyhow::Error> {
76+
// migrate pipelines and function associations
77+
pipeline_func::run(false).await?;
78+
79+
Ok(())
80+
}
81+
6982
pub async fn upgrade_resource_names() -> Result<(), anyhow::Error> {
7083
// The below migration requires ofga init ready, but on Router node,
7184
// we don't initialize ofga, hence the migration should not run on router

0 commit comments

Comments
 (0)