Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,17 @@ fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime<Utc>)

let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.map(|metric| metric.get() as u64)
.map(|metric| metric.get())
.unwrap_or(0);

let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.map(|metric| metric.get() as u64)
.map(|metric| metric.get())
.unwrap_or(0);

let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_labels)
.map(|metric| metric.get() as u64)
.map(|metric| metric.get())
.unwrap_or(0);

(events_ingested, ingestion_size, storage_size)
Expand Down
17 changes: 12 additions & 5 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::handlers::TelemetryType;
use crate::metrics::{
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE,
};
use crate::storage::StreamType;
use crate::storage::retention::Retention;
Expand All @@ -46,19 +47,25 @@ pub fn update_stats(
.add(num_rows as i64);
EVENTS_INGESTED_DATE
.with_label_values(&[stream_name, origin, &parsed_date])
.add(num_rows as i64);
.inc_by(num_rows as u64);
EVENTS_INGESTED_SIZE
.with_label_values(&[stream_name, origin])
.add(size as i64);
EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[stream_name, origin, &parsed_date])
.add(size as i64);
.inc_by(size);
LIFETIME_EVENTS_INGESTED
.with_label_values(&[stream_name, origin])
.add(num_rows as i64);
LIFETIME_EVENTS_INGESTED_SIZE
.with_label_values(&[stream_name, origin])
.add(size as i64);
TOTAL_EVENTS_INGESTED_DATE
.with_label_values(&[origin, &parsed_date])
.add(num_rows as i64);
TOTAL_EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[origin, &parsed_date])
.add(size as i64);
}

/// In order to support backward compatability with streams created before v1.6.4,
Expand Down Expand Up @@ -173,12 +180,12 @@ pub fn load_daily_metrics(manifests: &Vec<ManifestItem>, stream_name: &str) {
let storage_size = manifest.storage_size;
EVENTS_INGESTED_DATE
.with_label_values(&[stream_name, "json", &manifest_date])
.set(events_ingested as i64);
.inc_by(events_ingested);
EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[stream_name, "json", &manifest_date])
.set(ingestion_size as i64);
.inc_by(ingestion_size);
EVENTS_STORAGE_SIZE_DATE
.with_label_values(&["data", stream_name, "parquet", &manifest_date])
.set(storage_size as i64);
.inc_by(storage_size);
}
}
95 changes: 75 additions & 20 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,47 @@ pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");

pub static EVENTS_INGESTED: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE),
Opts::new("events_ingested", "Events ingested for a stream").namespace(METRICS_NAMESPACE),
&["stream", "format"],
)
.expect("metric can be created")
});

pub static EVENTS_INGESTED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("events_ingested_size", "Events ingested size bytes")
.namespace(METRICS_NAMESPACE),
Opts::new(
"events_ingested_size",
"Events ingested size bytes for a stream",
)
.namespace(METRICS_NAMESPACE),
&["stream", "format"],
)
.expect("metric can be created")
});

pub static STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE),
Opts::new("storage_size", "Storage size bytes for a stream").namespace(METRICS_NAMESPACE),
&["type", "stream", "format"],
)
.expect("metric can be created")
});

pub static EVENTS_DELETED: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE),
Opts::new("events_deleted", "Events deleted for a stream").namespace(METRICS_NAMESPACE),
&["stream", "format"],
)
.expect("metric can be created")
});

pub static EVENTS_DELETED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("events_deleted_size", "Events deleted size bytes").namespace(METRICS_NAMESPACE),
Opts::new(
"events_deleted_size",
"Events deleted size bytes for a stream",
)
.namespace(METRICS_NAMESPACE),
&["stream", "format"],
)
.expect("metric can be created")
Expand All @@ -73,7 +80,7 @@ pub static DELETED_EVENTS_STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
"deleted_events_storage_size",
"Deleted events storage size bytes",
"Deleted events storage size bytes for a stream",
)
.namespace(METRICS_NAMESPACE),
&["type", "stream", "format"],
Expand All @@ -83,8 +90,11 @@ pub static DELETED_EVENTS_STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {

pub static LIFETIME_EVENTS_INGESTED: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("lifetime_events_ingested", "Lifetime events ingested")
.namespace(METRICS_NAMESPACE),
Opts::new(
"lifetime_events_ingested",
"Lifetime events ingested for a stream",
)
.namespace(METRICS_NAMESPACE),
&["stream", "format"],
)
.expect("metric can be created")
Expand All @@ -94,7 +104,7 @@ pub static LIFETIME_EVENTS_INGESTED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
"lifetime_events_ingested_size",
"Lifetime events ingested size bytes",
"Lifetime events ingested size bytes for a stream",
)
.namespace(METRICS_NAMESPACE),
&["stream", "format"],
Expand All @@ -106,50 +116,86 @@ pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
"lifetime_events_storage_size",
"Lifetime events storage size bytes",
"Lifetime events storage size bytes for a stream",
)
.namespace(METRICS_NAMESPACE),
&["type", "stream", "format"],
)
.expect("metric can be created")
});

pub static EVENTS_INGESTED_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
pub static EVENTS_INGESTED_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
"events_ingested_date",
"Events ingested on a particular date",
"Events ingested for a stream on a particular date",
)
.namespace(METRICS_NAMESPACE),
&["stream", "format", "date"],
)
.expect("metric can be created")
});

pub static EVENTS_INGESTED_SIZE_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
pub static EVENTS_INGESTED_SIZE_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
"events_ingested_size_date",
"Events ingested size in bytes on a particular date",
"Events ingested size in bytes for a stream on a particular date",
)
.namespace(METRICS_NAMESPACE),
&["stream", "format", "date"],
)
.expect("metric can be created")
});

pub static EVENTS_STORAGE_SIZE_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
pub static EVENTS_STORAGE_SIZE_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
"events_storage_size_date",
"Events storage size in bytes on a particular date",
"Events storage size in bytes for a stream on a particular date",
)
.namespace(METRICS_NAMESPACE),
&["type", "stream", "format", "date"],
)
.expect("metric can be created")
});

pub static TOTAL_EVENTS_INGESTED_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
"total_events_ingested_date",
"total events ingested on a particular date",
)
.namespace(METRICS_NAMESPACE),
&["format", "date"],
)
.expect("metric can be created")
});

pub static TOTAL_EVENTS_INGESTED_SIZE_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
"total_events_ingested_size_date",
"Total events ingested size in bytes on a particular date",
)
.namespace(METRICS_NAMESPACE),
&["format", "date"],
)
.expect("metric can be created")
});

pub static TOTAL_EVENTS_STORAGE_SIZE_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
"total_events_storage_size_date",
"Total events storage size in bytes on a particular date",
)
.namespace(METRICS_NAMESPACE),
&["format", "date"],
)
.expect("metric can be created")
});

pub static STAGING_FILES: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE),
Expand Down Expand Up @@ -219,6 +265,15 @@ fn custom_metrics(registry: &Registry) {
registry
.register(Box::new(EVENTS_STORAGE_SIZE_DATE.clone()))
.expect("metric can be registered");
registry
.register(Box::new(TOTAL_EVENTS_INGESTED_DATE.clone()))
.expect("metric can be registered");
registry
.register(Box::new(TOTAL_EVENTS_INGESTED_SIZE_DATE.clone()))
.expect("metric can be registered");
registry
.register(Box::new(TOTAL_EVENTS_STORAGE_SIZE_DATE.clone()))
.expect("metric can be registered");
registry
.register(Box::new(STAGING_FILES.clone()))
.expect("metric can be registered");
Expand Down
Loading
Loading