Skip to content

Commit 80f5d96

Browse files
improve metrics collection
1 parent c521d43 commit 80f5d96

File tree

11 files changed

+569
-558
lines changed

11 files changed

+569
-558
lines changed

src/catalog/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,17 +189,17 @@ fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime<Utc>)
189189

190190
let events_ingested = EVENTS_INGESTED_DATE
191191
.get_metric_with_label_values(&event_labels)
192-
.map(|metric| metric.get() as u64)
192+
.map(|metric| metric.get())
193193
.unwrap_or(0);
194194

195195
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
196196
.get_metric_with_label_values(&event_labels)
197-
.map(|metric| metric.get() as u64)
197+
.map(|metric| metric.get())
198198
.unwrap_or(0);
199199

200200
let storage_size = EVENTS_STORAGE_SIZE_DATE
201201
.get_metric_with_label_values(&storage_labels)
202-
.map(|metric| metric.get() as u64)
202+
.map(|metric| metric.get())
203203
.unwrap_or(0);
204204

205205
(events_ingested, ingestion_size, storage_size)

src/metadata.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::handlers::TelemetryType;
2929
use crate::metrics::{
3030
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
3131
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
32+
TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE,
3233
};
3334
use crate::storage::StreamType;
3435
use crate::storage::retention::Retention;
@@ -46,19 +47,25 @@ pub fn update_stats(
4647
.add(num_rows as i64);
4748
EVENTS_INGESTED_DATE
4849
.with_label_values(&[stream_name, origin, &parsed_date])
49-
.add(num_rows as i64);
50+
.inc_by(num_rows as u64);
5051
EVENTS_INGESTED_SIZE
5152
.with_label_values(&[stream_name, origin])
5253
.add(size as i64);
5354
EVENTS_INGESTED_SIZE_DATE
5455
.with_label_values(&[stream_name, origin, &parsed_date])
55-
.add(size as i64);
56+
.inc_by(size);
5657
LIFETIME_EVENTS_INGESTED
5758
.with_label_values(&[stream_name, origin])
5859
.add(num_rows as i64);
5960
LIFETIME_EVENTS_INGESTED_SIZE
6061
.with_label_values(&[stream_name, origin])
6162
.add(size as i64);
63+
TOTAL_EVENTS_INGESTED_DATE
64+
.with_label_values(&[origin, &parsed_date])
65+
.inc_by(num_rows as u64);
66+
TOTAL_EVENTS_INGESTED_SIZE_DATE
67+
.with_label_values(&[origin, &parsed_date])
68+
.inc_by(size);
6269
}
6370

6471
/// In order to support backward compatability with streams created before v1.6.4,
@@ -173,12 +180,12 @@ pub fn load_daily_metrics(manifests: &Vec<ManifestItem>, stream_name: &str) {
173180
let storage_size = manifest.storage_size;
174181
EVENTS_INGESTED_DATE
175182
.with_label_values(&[stream_name, "json", &manifest_date])
176-
.set(events_ingested as i64);
183+
.inc_by(events_ingested);
177184
EVENTS_INGESTED_SIZE_DATE
178185
.with_label_values(&[stream_name, "json", &manifest_date])
179-
.set(ingestion_size as i64);
186+
.inc_by(ingestion_size);
180187
EVENTS_STORAGE_SIZE_DATE
181188
.with_label_values(&["data", stream_name, "parquet", &manifest_date])
182-
.set(storage_size as i64);
189+
.inc_by(storage_size);
183190
}
184191
}

src/metrics/mod.rs

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,40 +30,47 @@ pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");
3030

3131
pub static EVENTS_INGESTED: Lazy<IntGaugeVec> = Lazy::new(|| {
3232
IntGaugeVec::new(
33-
Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE),
33+
Opts::new("events_ingested", "Events ingested for a stream").namespace(METRICS_NAMESPACE),
3434
&["stream", "format"],
3535
)
3636
.expect("metric can be created")
3737
});
3838

3939
pub static EVENTS_INGESTED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
4040
IntGaugeVec::new(
41-
Opts::new("events_ingested_size", "Events ingested size bytes")
42-
.namespace(METRICS_NAMESPACE),
41+
Opts::new(
42+
"events_ingested_size",
43+
"Events ingested size bytes for a stream",
44+
)
45+
.namespace(METRICS_NAMESPACE),
4346
&["stream", "format"],
4447
)
4548
.expect("metric can be created")
4649
});
4750

4851
pub static STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
4952
IntGaugeVec::new(
50-
Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE),
53+
Opts::new("storage_size", "Storage size bytes for a stream").namespace(METRICS_NAMESPACE),
5154
&["type", "stream", "format"],
5255
)
5356
.expect("metric can be created")
5457
});
5558

5659
pub static EVENTS_DELETED: Lazy<IntGaugeVec> = Lazy::new(|| {
5760
IntGaugeVec::new(
58-
Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE),
61+
Opts::new("events_deleted", "Events deleted for a stream").namespace(METRICS_NAMESPACE),
5962
&["stream", "format"],
6063
)
6164
.expect("metric can be created")
6265
});
6366

6467
pub static EVENTS_DELETED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
6568
IntGaugeVec::new(
66-
Opts::new("events_deleted_size", "Events deleted size bytes").namespace(METRICS_NAMESPACE),
69+
Opts::new(
70+
"events_deleted_size",
71+
"Events deleted size bytes for a stream",
72+
)
73+
.namespace(METRICS_NAMESPACE),
6774
&["stream", "format"],
6875
)
6976
.expect("metric can be created")
@@ -73,7 +80,7 @@ pub static DELETED_EVENTS_STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
7380
IntGaugeVec::new(
7481
Opts::new(
7582
"deleted_events_storage_size",
76-
"Deleted events storage size bytes",
83+
"Deleted events storage size bytes for a stream",
7784
)
7885
.namespace(METRICS_NAMESPACE),
7986
&["type", "stream", "format"],
@@ -83,8 +90,11 @@ pub static DELETED_EVENTS_STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
8390

8491
pub static LIFETIME_EVENTS_INGESTED: Lazy<IntGaugeVec> = Lazy::new(|| {
8592
IntGaugeVec::new(
86-
Opts::new("lifetime_events_ingested", "Lifetime events ingested")
87-
.namespace(METRICS_NAMESPACE),
93+
Opts::new(
94+
"lifetime_events_ingested",
95+
"Lifetime events ingested for a stream",
96+
)
97+
.namespace(METRICS_NAMESPACE),
8898
&["stream", "format"],
8999
)
90100
.expect("metric can be created")
@@ -94,7 +104,7 @@ pub static LIFETIME_EVENTS_INGESTED_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
94104
IntGaugeVec::new(
95105
Opts::new(
96106
"lifetime_events_ingested_size",
97-
"Lifetime events ingested size bytes",
107+
"Lifetime events ingested size bytes for a stream",
98108
)
99109
.namespace(METRICS_NAMESPACE),
100110
&["stream", "format"],
@@ -106,50 +116,86 @@ pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
106116
IntGaugeVec::new(
107117
Opts::new(
108118
"lifetime_events_storage_size",
109-
"Lifetime events storage size bytes",
119+
"Lifetime events storage size bytes for a stream",
110120
)
111121
.namespace(METRICS_NAMESPACE),
112122
&["type", "stream", "format"],
113123
)
114124
.expect("metric can be created")
115125
});
116126

117-
pub static EVENTS_INGESTED_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
118-
IntGaugeVec::new(
127+
pub static EVENTS_INGESTED_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
128+
IntCounterVec::new(
119129
Opts::new(
120130
"events_ingested_date",
121-
"Events ingested on a particular date",
131+
"Events ingested for a stream on a particular date",
122132
)
123133
.namespace(METRICS_NAMESPACE),
124134
&["stream", "format", "date"],
125135
)
126136
.expect("metric can be created")
127137
});
128138

129-
pub static EVENTS_INGESTED_SIZE_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
130-
IntGaugeVec::new(
139+
pub static EVENTS_INGESTED_SIZE_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
140+
IntCounterVec::new(
131141
Opts::new(
132142
"events_ingested_size_date",
133-
"Events ingested size in bytes on a particular date",
143+
"Events ingested size in bytes for a stream on a particular date",
134144
)
135145
.namespace(METRICS_NAMESPACE),
136146
&["stream", "format", "date"],
137147
)
138148
.expect("metric can be created")
139149
});
140150

141-
pub static EVENTS_STORAGE_SIZE_DATE: Lazy<IntGaugeVec> = Lazy::new(|| {
142-
IntGaugeVec::new(
151+
pub static EVENTS_STORAGE_SIZE_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
152+
IntCounterVec::new(
143153
Opts::new(
144154
"events_storage_size_date",
145-
"Events storage size in bytes on a particular date",
155+
"Events storage size in bytes for a stream on a particular date",
146156
)
147157
.namespace(METRICS_NAMESPACE),
148158
&["type", "stream", "format", "date"],
149159
)
150160
.expect("metric can be created")
151161
});
152162

163+
pub static TOTAL_EVENTS_INGESTED_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
164+
IntCounterVec::new(
165+
Opts::new(
166+
"total_events_ingested_date",
167+
"total events ingested on a particular date",
168+
)
169+
.namespace(METRICS_NAMESPACE),
170+
&["format", "date"],
171+
)
172+
.expect("metric can be created")
173+
});
174+
175+
pub static TOTAL_EVENTS_INGESTED_SIZE_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
176+
IntCounterVec::new(
177+
Opts::new(
178+
"total_events_ingested_size_date",
179+
"Total events ingested size in bytes on a particular date",
180+
)
181+
.namespace(METRICS_NAMESPACE),
182+
&["format", "date"],
183+
)
184+
.expect("metric can be created")
185+
});
186+
187+
pub static TOTAL_EVENTS_STORAGE_SIZE_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
188+
IntCounterVec::new(
189+
Opts::new(
190+
"total_events_storage_size_date",
191+
"Total events storage size in bytes on a particular date",
192+
)
193+
.namespace(METRICS_NAMESPACE),
194+
&["format", "date"],
195+
)
196+
.expect("metric can be created")
197+
});
198+
153199
pub static STAGING_FILES: Lazy<IntGaugeVec> = Lazy::new(|| {
154200
IntGaugeVec::new(
155201
Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE),
@@ -219,6 +265,15 @@ fn custom_metrics(registry: &Registry) {
219265
registry
220266
.register(Box::new(EVENTS_STORAGE_SIZE_DATE.clone()))
221267
.expect("metric can be registered");
268+
registry
269+
.register(Box::new(TOTAL_EVENTS_INGESTED_DATE.clone()))
270+
.expect("metric can be registered");
271+
registry
272+
.register(Box::new(TOTAL_EVENTS_INGESTED_SIZE_DATE.clone()))
273+
.expect("metric can be registered");
274+
registry
275+
.register(Box::new(TOTAL_EVENTS_STORAGE_SIZE_DATE.clone()))
276+
.expect("metric can be registered");
222277
registry
223278
.register(Box::new(STAGING_FILES.clone()))
224279
.expect("metric can be registered");

src/query/listing_table_builder.rs

Lines changed: 19 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, ops::Bound, pin::Pin, sync::Arc};
19+
use std::{ops::Bound, sync::Arc};
2020

2121
use arrow_schema::Schema;
2222
use datafusion::{
@@ -27,9 +27,7 @@ use datafusion::{
2727
error::DataFusionError,
2828
logical_expr::col,
2929
};
30-
use futures_util::{Future, TryStreamExt, stream::FuturesUnordered};
3130
use itertools::Itertools;
32-
use object_store::{ObjectMeta, ObjectStore, path::Path};
3331

3432
use crate::{
3533
OBJECT_STORE_DATA_GRANULARITY, event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage,
@@ -56,7 +54,6 @@ impl ListingTableBuilder {
5654
pub async fn populate_via_listing(
5755
self,
5856
storage: Arc<dyn ObjectStorage>,
59-
client: Arc<dyn ObjectStore>,
6057
time_filters: &[PartialTimeFilter],
6158
) -> Result<Self, DataFusionError> {
6259
// Extract the minimum start time from the time filters.
@@ -90,67 +87,28 @@ impl ListingTableBuilder {
9087
let prefixes = TimeRange::new(start_time.and_utc(), end_time.and_utc())
9188
.generate_prefixes(OBJECT_STORE_DATA_GRANULARITY);
9289

93-
// Categorizes prefixes into "minute" and general resolve lists.
94-
let mut minute_resolve = HashMap::<String, Vec<String>>::new();
95-
let mut all_resolve = Vec::new();
90+
// Build all prefixes as relative paths
91+
let prefixes: Vec<_> = prefixes
92+
.into_iter()
93+
.map(|prefix| {
94+
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix))
95+
})
96+
.collect();
97+
98+
// Use storage.list_dirs_relative for all prefixes and flatten results
99+
let mut listing = Vec::new();
96100
for prefix in prefixes {
97-
let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix));
98-
let prefix = storage.absolute_url(path.as_relative_path()).to_string();
99-
if let Some(pos) = prefix.rfind("minute") {
100-
let hour_prefix = &prefix[..pos];
101-
minute_resolve
102-
.entry(hour_prefix.to_owned())
103-
.or_default()
104-
.push(prefix);
105-
} else {
106-
all_resolve.push(prefix);
101+
match storage.list_dirs_relative(&prefix).await {
102+
Ok(paths) => {
103+
listing.extend(paths.into_iter().map(|p| p.to_string()));
104+
}
105+
Err(e) => {
106+
return Err(DataFusionError::External(Box::new(e)));
107+
}
107108
}
108109
}
109110

110-
/// Resolve all prefixes asynchronously and collect the object metadata.
111-
type ResolveFuture =
112-
Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send>>;
113-
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();
114-
for (listing_prefix, prefixes) in minute_resolve {
115-
let client = Arc::clone(&client);
116-
tasks.push(Box::pin(async move {
117-
let path = Path::from(listing_prefix);
118-
let mut objects = client.list(Some(&path)).try_collect::<Vec<_>>().await?;
119-
120-
objects.retain(|obj| {
121-
prefixes.iter().any(|prefix| {
122-
obj.location
123-
.prefix_matches(&object_store::path::Path::from(prefix.as_ref()))
124-
})
125-
});
126-
127-
Ok(objects)
128-
}));
129-
}
130-
131-
for prefix in all_resolve {
132-
let client = Arc::clone(&client);
133-
tasks.push(Box::pin(async move {
134-
client
135-
.list(Some(&object_store::path::Path::from(prefix)))
136-
.try_collect::<Vec<_>>()
137-
.await
138-
}));
139-
}
140-
141-
let listing = tasks
142-
.try_collect::<Vec<Vec<ObjectMeta>>>()
143-
.await
144-
.map_err(|err| DataFusionError::External(Box::new(err)))?
145-
.into_iter()
146-
.flat_map(|res| {
147-
res.into_iter()
148-
.map(|obj| obj.location.to_string())
149-
.collect::<Vec<String>>()
150-
})
151-
.sorted()
152-
.rev()
153-
.collect_vec();
111+
let listing = listing.into_iter().sorted().rev().collect_vec();
154112

155113
Ok(Self {
156114
stream: self.stream,

0 commit comments

Comments
 (0)