-
-
Notifications
You must be signed in to change notification settings - Fork 149
Improve metrics collection #1415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Improve metrics collection #1415
Conversation
WalkthroughConverts several per-date metrics from gauges to counters and adds total-per-date gauges; centralizes storage telemetry into global histograms/counters with provider labels and error-status mapping; instruments storage backends; and refactors query/listing code to use the ObjectStorage abstraction, updating related signatures. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Q as Query Layer
participant S as ObjectStorage (abstraction)
participant P as Provider (s3/gcs/azure/localfs)
participant M as Metrics
rect rgba(200,220,255,0.25)
Note over Q,S: Listing & manifest collection (new flow)
Q->>S: list_dirs_relative(stream/prefix)
S->>P: LIST
P-->>S: paths
S-->>Q: relative paths
Q->>S: get_object(manifest_path) [concurrent]
S->>P: GET manifest
P-->>S: bytes / error
S->>M: STORAGE_REQUEST_RESPONSE_TIME{provider, "GET", status}
S->>M: STORAGE_FILES_SCANNED{provider,"GET"} += 1
Q-->>Q: parse manifests
end
sequenceDiagram
autonumber
participant App as Stats/Update
participant M as Metrics Registry
Note over App,M: Per-date event accounting
App->>M: EVENTS_INGESTED_DATE.inc_by(n)
App->>M: EVENTS_INGESTED_SIZE_DATE.inc_by(bytes)
App->>M: EVENTS_STORAGE_SIZE_DATE.inc_by(bytes)
App->>M: TOTAL_EVENTS_INGESTED_DATE.add_or_inc(...) %% total-per-date gauge updated
App->>M: TOTAL_EVENTS_STORAGE_SIZE_DATE.add_or_inc(...)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
src/query/stream_schema_provider.rs (1)
578-585
: Fix type mismatch: ObjectStoreUrl::parse expects &str, not url::Url.This won’t compile. Pass a string slice (or format!) instead.
- ObjectStoreUrl::parse(storage.store_url()).unwrap(), + ObjectStoreUrl::parse(storage.store_url().as_str()).unwrap(),src/storage/gcs.rs (1)
707-756
: Duplicate GET timing and counting inside get_objects; rely on _get_object metrics.
self.get_object(...)
already records GET timing and counts. The subsequentREQUEST_RESPONSE_TIME
GET observation andFILES_SCANNED
GET inc here double-count and the timing uses the LIST stopwatch, not the GET duration.- STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["gcs", "GET", "200"]) - .observe(list_start.elapsed().as_secs_f64()); - STORAGE_FILES_SCANNED - .with_label_values(&["gcs", "GET"]) - .inc();src/storage/metrics_layer.rs (1)
294-303
: Fix label cardinality mismatch in StreamMetricWrapper (will panic at runtime).STORAGE_REQUEST_RESPONSE_TIME expects 3 labels [provider, method, status], but StreamMetricWrapper supplies only ["LIST","200"]. This will trigger a panic on observe. Also, the wrapper stores &'static str labels, preventing passing the dynamic provider string.
Refactor the wrapper to carry provider and emit all three labels.
Apply this diff:
@@ - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> { let time = time::Instant::now(); let inner = self.inner.list(prefix); - let res = StreamMetricWrapper { - time, - labels: ["LIST", "200"], - inner, - }; + let res = StreamMetricWrapper { + time, + provider: &self.provider, + method: "LIST", + status: "200", + inner, + }; Box::pin(res) } @@ - fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, ) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> { let time = time::Instant::now(); let inner = self.inner.list_with_offset(prefix, offset); - let res = StreamMetricWrapper { - time, - labels: ["LIST_OFFSET", "200"], - inner, - }; + let res = StreamMetricWrapper { + time, + provider: &self.provider, + method: "LIST_OFFSET", + status: "200", + inner, + }; Box::pin(res) } @@ -struct StreamMetricWrapper<'a, const N: usize, T> { +struct StreamMetricWrapper<'a, T> { time: time::Instant, - labels: [&'static str; N], + provider: &'a str, + method: &'static str, + status: &'static str, inner: BoxStream<'a, T>, } -impl<T, const N: usize> Stream for StreamMetricWrapper<'_, N, T> { +impl<T> Stream for StreamMetricWrapper<'_, T> { type Item = T; @@ - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&self.labels) - .observe(self.time.elapsed().as_secs_f64()); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[self.provider, self.method, self.status]) + .observe(self.time.elapsed().as_secs_f64()); tAlso applies to: 310-319, 402-426
src/storage/azure_blob.rs (1)
763-812
: Remove duplicate and inaccurate GET metrics inside get_objects().Each self.get_object() already records GET metrics. Additionally, using list_start to time GET inflates durations with the loop’s total runtime.
Apply this diff:
- STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["azure_blob", "GET", "200"]) - .observe(list_start.elapsed().as_secs_f64()); - STORAGE_FILES_SCANNED - .with_label_values(&["azure_blob", "GET"]) - .inc();Also applies to: 792-799
src/storage/s3.rs (2)
343-365
: Avoid panic on GET: handle body read errors and measure full GET latency.Same issue as Azure: unwrap on resp.bytes() can panic and elapsed is computed before reading the body.
Apply this diff:
async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> { - let time = std::time::Instant::now(); - let resp = self.client.get(&to_object_store_path(path)).await; - let elapsed = time.elapsed().as_secs_f64(); + let time = std::time::Instant::now(); + let resp = self.client.get(&to_object_store_path(path)).await; STORAGE_FILES_SCANNED .with_label_values(&["s3", "GET"]) .inc(); - match resp { - Ok(resp) => { - let body: Bytes = resp.bytes().await.unwrap(); - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["s3", "GET", "200"]) - .observe(elapsed); - Ok(body) - } - Err(err) => { - let status_code = error_to_status_code(&err); - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["s3", "GET", status_code]) - .observe(elapsed); - Err(err.into()) - } - } + match resp { + Ok(resp) => match resp.bytes().await { + Ok(body) => { + let elapsed = time.elapsed().as_secs_f64(); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["s3", "GET", "200"]) + .observe(elapsed); + Ok(body) + } + Err(err) => { + let elapsed = time.elapsed().as_secs_f64(); + let status_code = error_to_status_code(&err); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["s3", "GET", status_code]) + .observe(elapsed); + Err(err.into()) + } + }, + Err(err) => { + let elapsed = time.elapsed().as_secs_f64(); + let status_code = error_to_status_code(&err); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["s3", "GET", status_code]) + .observe(elapsed); + Err(err.into()) + } + } }
874-923
: Remove duplicate and inaccurate GET metrics inside get_objects().Each self.get_object() already records GET metrics. Using list_start for GET inflates durations.
Apply this diff:
- STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["s3", "GET", "200"]) - .observe(list_start.elapsed().as_secs_f64()); - STORAGE_FILES_SCANNED - .with_label_values(&["s3", "GET"]) - .inc();Also applies to: 903-909
🧹 Nitpick comments (18)
src/catalog/mod.rs (1)
190-206
: Result.map(...).unwrap_or(0) on counter get(): LGTM, but consider unwrap_or_default().
- Using .map(|m| m.get()).unwrap_or(0) over the prometheus Result is concise and correct with IntCounterVec (u64).
- Minor nit: unwrap_or_default() reads slightly clearer and avoids literal zeros.
No functional issues.
src/stats.rs (1)
205-221
: delete_with_label_prefix now targets IntCounterVec: LGTM, but be aware of cost.Iterating MetricFamily and removing by exact label map is correct and avoids guessing dates. This is O(N labels) and fine for admin paths; keep it off hot paths.
src/storage/object_storage.rs (1)
148-150
: Filename parsing assumes dot-separated convention; consider a defensive fallback (context-aware).This split/indexing will panic if the filename deviates. Based on the retrieved learning (Parseable’s server guarantees the
date=YYYY-MM-DD.hour=HH.minute=MM
pattern), this is safe in normal flows. Still, a debug assert or graceful fallback would make this more robust during migrations or manual uploads.- let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0]; - file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1]; + let mut file_date_part = filename.split('.').next().unwrap_or(filename); + file_date_part = file_date_part + .split('=') + .nth(1) + .unwrap_or_else(|| { + debug_assert!(false, "Unexpected parquet filename format: {filename}"); + "unknown-date" + });src/query/listing_table_builder.rs (1)
98-109
: Best-effort listing instead of hard fail (optional).A transient failure on one prefix aborts the whole listing. Consider logging and continuing, so older segments still get scanned.
- Err(e) => { - return Err(DataFusionError::External(Box::new(e))); - } + Err(e) => { + tracing::warn!("list_dirs_relative failed for {}: {e}", prefix); + continue; + }src/query/stream_schema_provider.rs (2)
569-573
: Metric semantics: “files scanned” vs planned parquet count.You increment
STORAGE_FILES_SCANNED[.., "GET"]
by the number of parquet files from manifests. These are “planned to scan,” not actual GET/HEADs (which DataFusion will perform later). Either move this count where I/O happens or rename/use a planning metric to avoid inflating GET counts.
843-866
: Duplicate manifest collection logic; unify to one helper.This is nearly identical to enterprise’s
collect_manifest_files
(see src/enterprise/utils.rs). Consider centralizing it to avoid divergence in error handling and performance.src/storage/gcs.rs (2)
665-690
: HEAD double-count:STORAGE_FILES_SCANNED
incremented twice.You inc() at Lines 669-671 and again at 677-679 within the Ok branch. Keep only one increment per HEAD to avoid inflation.
- // Record single file accessed - STORAGE_FILES_SCANNED - .with_label_values(&["gcs", "HEAD"]) - .inc();
348-386
: LIST metrics only capture stream creation time, not stream consumption (optional).You observe LIST timing immediately after creating the stream. For truer latency, consider measuring from start to end of consumption or supplement with per-chunk timings (you already do that for GET/DELETE). Not critical if consistency across providers is the goal.
src/storage/localfs.rs (3)
133-137
: Consider not incrementing HEAD for an unimplemented operation.Incrementing “files scanned” for HEAD even though it immediately errors skews counts. Either remove it here or add a dedicated “UNSUPPORTED” status dimension (if you want to track attempts).
- STORAGE_FILES_SCANNED - .with_label_values(&["localfs", "HEAD"]) - .inc();
304-373
: Streaming LIST/GET accounting is consistent; minor nit on 404 mapping.This path reports LIST 200 when the directory exists and per-file GET 200/404 as appropriate. If directory doesn’t exist you surface LIST 404 and return early—fine for now. Consider treating missing dir as empty listing to be more permissive, but not required.
416-440
: DELETE prefix: observe status but no “files scanned” increment (OK).Given this removes a directory, not a file object, omitting a DELETE counter increment makes sense. If you want parity with object-store providers, you could increment by the number of entries removed (requires walking), but that’s optional.
src/storage/metrics_layer.rs (2)
39-66
: Broaden and correct error-to-status mapping.
- Mapping Generic to 400 likely misclassifies server/transport failures as client errors; 500 is safer.
- Consider handling other common variants (e.g., InvalidPath -> 400, InvalidRange -> 416, Timeout/DeadlineExceeded -> 504/408) for better fidelity.
Apply this diff (adjust variants per object_store version you use):
pub fn error_to_status_code(err: &object_store::Error) -> &'static str { match err { - // 400 Bad Request - Client errors - object_store::Error::Generic { .. } => "400", + // 400 Bad Request - obvious client errors + object_store::Error::InvalidPath { .. } => "400", + // 416 Range Not Satisfiable + object_store::Error::InvalidRange { .. } => "416", + // Treat generic/unknown as server-side + object_store::Error::Generic { .. } => "500", // 401 Unauthorized - Authentication required object_store::Error::Unauthenticated { .. } => "401", // 404 Not Found - Resource doesn't exist object_store::Error::NotFound { .. } => "404", // 409 Conflict - Resource already exists object_store::Error::AlreadyExists { .. } => "409", // 412 Precondition Failed - If-Match, If-None-Match, etc. failed object_store::Error::Precondition { .. } => "412", // 304 Not Modified object_store::Error::NotModified { .. } => "304", // 501 Not Implemented - Feature not supported object_store::Error::NotSupported { .. } => "501", + // 504 Gateway Timeout (or choose 408 Request Timeout) + object_store::Error::Timeout { .. } => "504", // 500 Internal Server Error - All other errors _ => "500", } }If the exact variant names differ in your object_store version, I can tailor this.
369-399
: Standardize method labels for conditional ops.Use operation names that match method names to ease querying and dashboards.
Apply this diff:
- .with_label_values(&[&self.provider, "COPY_IF", status]) + .with_label_values(&[&self.provider, "COPY_IF_NOT_EXISTS", status]) @@ - .with_label_values(&[&self.provider, "RENAME_IF", status]) + .with_label_values(&[&self.provider, "RENAME_IF_NOT_EXISTS", status])src/storage/azure_blob.rs (2)
723-746
: HEAD files-scanned should increment regardless of success.Other paths (e.g., check) count attempts, not just successes. Increment once per HEAD call to keep semantics consistent.
Apply this diff:
- match &result { + match &result { Ok(_) => { STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["azure_blob", "HEAD", "200"]) .observe(head_elapsed); - // Record single file accessed - STORAGE_FILES_SCANNED - .with_label_values(&["azure_blob", "HEAD"]) - .inc(); } Err(err) => { let status_code = error_to_status_code(err); STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["azure_blob", "HEAD", status_code]) .observe(head_elapsed); } } - Ok(result?) + // Count the attempt once + STORAGE_FILES_SCANNED + .with_label_values(&["azure_blob", "HEAD"]) + .inc(); + Ok(result?)
1006-1056
: Consider counting HEAD attempts in list_old_streams() checks.You record HEAD latency per stream but don’t increment STORAGE_FILES_SCANNED for HEAD. Add a single increment per attempt inside the task.
Apply this diff:
let task = async move { let head_start = Instant::now(); let result = self.client.head(&StorePath::from(key)).await; let head_elapsed = head_start.elapsed().as_secs_f64(); match &result { Ok(_) => { STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["azure_blob", "HEAD", "200"]) .observe(head_elapsed); } Err(err) => { let status_code = error_to_status_code(err); STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["azure_blob", "HEAD", status_code]) .observe(head_elapsed); } } + STORAGE_FILES_SCANNED + .with_label_values(&["azure_blob", "HEAD"]) + .inc(); result.map(|_| ()) };src/storage/s3.rs (2)
666-686
: Standardize multipart INIT label to match Azure/GCS.Use PUT_MULTIPART_INIT for the initiation step for consistency across providers.
Apply this diff:
- let mut async_writer = match async_writer { + let mut async_writer = match async_writer { Ok(writer) => { STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["s3", "PUT_MULTIPART", "200"]) + .with_label_values(&["s3", "PUT_MULTIPART_INIT", "200"]) .observe(multipart_elapsed); writer } Err(err) => { let status_code = error_to_status_code(&err); STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["s3", "PUT_MULTIPART", status_code]) + .with_label_values(&["s3", "PUT_MULTIPART_INIT", status_code]) .observe(multipart_elapsed); return Err(err.into()); } };Also applies to: 672-676
1117-1167
: Consider counting HEAD attempts in list_old_streams() checks.Latency is recorded per HEAD, but files-scanned is not incremented.
Apply this diff:
let task = async move { let head_start = Instant::now(); let result = self.client.head(&StorePath::from(key)).await; let head_elapsed = head_start.elapsed().as_secs_f64(); @@ } - result.map(|_| ()) + STORAGE_FILES_SCANNED + .with_label_values(&["s3", "HEAD"]) + .inc(); + result.map(|_| ()) };Also applies to: 1140-1159
src/metrics/storage.rs (1)
25-33
: Global histogram looks good; consider explicit buckets.Default buckets may not fit object-store latencies. Consider Prometheus-style buckets for sub-ms to tens-of-seconds.
Apply this diff:
pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("storage_request_response_time", "Storage Request Latency") - .namespace(METRICS_NAMESPACE), - &["provider", "method", "status"], - ) + HistogramVec::new( + HistogramOpts::new("storage_request_response_time", "Storage Request Latency") + .namespace(METRICS_NAMESPACE) + .buckets(vec![ + 0.005, 0.01, 0.025, 0.05, 0.1, + 0.25, 0.5, 1.0, 2.5, 5.0, + 10.0, 30.0, 60.0, + ]), + &["provider", "method", "status"], + ) .expect("metric can be created") });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (14)
src/catalog/mod.rs
(1 hunks)src/metadata.rs
(3 hunks)src/metrics/mod.rs
(6 hunks)src/metrics/storage.rs
(1 hunks)src/query/listing_table_builder.rs
(2 hunks)src/query/mod.rs
(1 hunks)src/query/stream_schema_provider.rs
(12 hunks)src/stats.rs
(3 hunks)src/storage/azure_blob.rs
(24 hunks)src/storage/gcs.rs
(24 hunks)src/storage/localfs.rs
(15 hunks)src/storage/metrics_layer.rs
(8 hunks)src/storage/object_storage.rs
(2 hunks)src/storage/s3.rs
(24 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-08-20T17:01:25.791Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1409
File: src/storage/field_stats.rs:429-456
Timestamp: 2025-08-20T17:01:25.791Z
Learning: In Parseable's field stats calculation (src/storage/field_stats.rs), the extract_datetime_from_parquet_path_regex function correctly works with filename-only parsing because Parseable's server-side filename generation guarantees the dot-separated format date=YYYY-MM-DD.hour=HH.minute=MM pattern in parquet filenames.
Applied to files:
src/storage/object_storage.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/storage/gcs.rs
src/storage/azure_blob.rs
src/storage/s3.rs
📚 Learning: 2025-08-21T14:41:55.462Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:876-916
Timestamp: 2025-08-21T14:41:55.462Z
Learning: In Parseable's object storage system (src/storage/object_storage.rs), date directories (date=YYYY-MM-DD) are only created when there's actual data to store, which means they will always contain corresponding hour and minute subdirectories. There can be no case where a date directory exists without hour or minute subdirectories.
Applied to files:
src/storage/gcs.rs
src/storage/azure_blob.rs
src/storage/localfs.rs
src/storage/s3.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.
Applied to files:
src/storage/gcs.rs
src/storage/azure_blob.rs
src/storage/s3.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/query/stream_schema_provider.rs
🧬 Code graph analysis (9)
src/catalog/mod.rs (1)
src/stats.rs (1)
event_labels
(223-225)
src/metadata.rs (1)
src/catalog/mod.rs (4)
num_rows
(61-61)num_rows
(78-80)ingestion_size
(58-58)ingestion_size
(70-72)
src/storage/gcs.rs (3)
src/storage/metrics_layer.rs (3)
error_to_status_code
(40-66)new
(75-80)head
(255-269)src/storage/object_storage.rs (3)
parseable_json_path
(1281-1283)new
(82-91)head
(209-209)src/storage/s3.rs (3)
from
(1345-1353)from
(1357-1359)head
(831-857)
src/storage/azure_blob.rs (2)
src/storage/metrics_layer.rs (3)
error_to_status_code
(40-66)new
(75-80)head
(255-269)src/storage/gcs.rs (12)
resp
(296-302)resp
(420-426)resp
(1130-1135)resp
(1164-1169)_delete_prefix
(233-282)_list_streams
(284-346)name
(126-128)_list_dates
(348-386)_upload_file
(469-494)head
(664-690)get_ingestor_meta_file_paths
(758-793)check
(885-911)
src/storage/localfs.rs (5)
src/storage/object_storage.rs (7)
get_ingestor_meta_file_paths
(339-341)delete_prefix
(227-227)delete_object
(338-338)check
(228-228)delete_stream
(229-229)try_delete_node_meta
(346-346)list_streams
(230-230)src/storage/metrics_layer.rs (2)
delete_stream
(287-292)copy
(337-351)src/storage/azure_blob.rs (7)
get_ingestor_meta_file_paths
(814-849)delete_prefix
(909-913)delete_object
(915-939)check
(941-967)delete_stream
(969-973)try_delete_node_meta
(975-1000)list_streams
(1002-1004)src/storage/gcs.rs (7)
get_ingestor_meta_file_paths
(758-793)delete_prefix
(853-857)delete_object
(859-883)check
(885-911)delete_stream
(913-917)try_delete_node_meta
(919-944)list_streams
(946-948)src/storage/s3.rs (9)
get_ingestor_meta_file_paths
(925-960)from
(1345-1353)from
(1357-1359)delete_prefix
(1020-1024)delete_object
(1026-1050)check
(1052-1078)delete_stream
(1080-1084)try_delete_node_meta
(1086-1111)list_streams
(1113-1115)
src/storage/s3.rs (3)
src/storage/metrics_layer.rs (3)
error_to_status_code
(40-66)new
(75-80)head
(255-269)src/storage/object_storage.rs (4)
parseable_json_path
(1281-1283)new
(82-91)name
(200-200)head
(209-209)src/storage/azure_blob.rs (8)
resp
(331-337)resp
(455-461)resp
(1190-1195)resp
(1224-1229)_delete_prefix
(268-317)_list_streams
(319-381)name
(165-167)head
(723-746)
src/metrics/storage.rs (1)
src/storage/metrics_layer.rs (1)
new
(75-80)
src/storage/metrics_layer.rs (3)
src/storage/azure_blob.rs (1)
head
(723-746)src/storage/gcs.rs (1)
head
(664-690)src/storage/s3.rs (1)
head
(831-857)
src/query/stream_schema_provider.rs (3)
src/parseable/mod.rs (4)
storage
(244-246)new
(146-158)serde_json
(300-300)serde_json
(306-306)src/enterprise/utils.rs (1)
collect_manifest_files
(159-182)src/storage/s3.rs (2)
from
(1345-1353)from
(1357-1359)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (17)
src/metrics/mod.rs (3)
268-276
: Registry updates for new totals: LGTM.The new TOTAL_*_DATE metrics are registered. Once labels include stream (see prior comment), this remains correct.
33-33
: Help text clarifications: LGTM.Descriptions now explicitly say “for a stream”, improving clarity and aligning with label cardinality.
Also applies to: 41-46, 53-53, 61-61, 69-75, 83-83, 93-97, 107-107, 119-119
127-137
: The script will print the relevant section for review. Please share the output so we can verify whether the catalog module correctly casts metrics to u64.src/query/mod.rs (1)
555-566
: The script will dump thecollect_manifest_files
implementation so we can verify how it handles absolute URLs.src/stats.rs (1)
25-25
: Imports for counters and TOTAL_ metrics: LGTM.*Prepares this module to manage per-date counters and totals.
Also applies to: 32-34
src/metadata.rs (1)
49-57
: Counters: correct shift to inc_by for per-date metrics.
- update_stats and load_daily_metrics now use inc_by(...) for per-date counters. This matches the new metric types and avoids set(...) races.
One caution: load_daily_metrics should be called only once per process start (or after clearing/removing label series) to avoid double accumulation.
Provide the call chain where load_daily_metrics is invoked to ensure idempotent usage.
Also applies to: 181-189
src/storage/object_storage.rs (1)
158-166
: Metric type consistency: counter vs gauge.
EVENTS_STORAGE_SIZE_DATE
switched toinc_by
(counter), whileSTORAGE_SIZE
/LIFETIME_EVENTS_STORAGE_SIZE
still use.add()
(gauge). If that’s intentional (lifetime and current gauges vs per-date counters), all good. Please confirmEVENTS_STORAGE_SIZE_DATE
is now anIntCounterVec
or this won’t compile.src/query/stream_schema_provider.rs (1)
281-289
: API shift to storage-only listing looks good.The migration to storage-based listing and removal of the ObjectStore dependency keeps concerns localized. No issues spotted here.
src/storage/localfs.rs (3)
148-176
: GET instrumentation looks solid; status mapping covers 200/404/500.Good balance between observability and behavior: no panics, and single-file GET increments are accurate.
389-408
: PUT metrics and single-file count are correct.Instrumentation patterns align with the rest of the providers.
557-602
: list_streams observability matches provider patterns; LGTM.The LIST status timing and error-to-status mapping are consistent. No issues spotted.
src/storage/metrics_layer.rs (2)
71-80
: Provider label addition looks good.Storing provider on MetricLayer and extending new(inner, provider) is the right move for consistent tagging.
96-110
: Instrumentation approach is clean and consistent.Measuring elapsed time per call, mapping to status, and returning inner results directly keeps behavior unchanged while improving observability.
Also applies to: 118-131, 154-167, 172-185, 187-201, 203-217, 219-233, 235-253, 255-269, 271-285, 321-335, 337-351, 353-367, 369-383, 385-399
src/storage/azure_blob.rs (2)
245-265
: PUT metrics and counting look good.Timing and status labeling are correct and the files-scanned counter is incremented once per PUT attempt.
267-316
: Instrumentation and counters look consistent across operations.Timing + provider/method/status labels are applied consistently; per-operation files-scanned counters are incremented appropriately.
Also applies to: 319-381, 383-421, 423-502, 504-529, 915-939, 941-967, 977-999, 1069-1095, 1103-1129, 1169-1235
src/storage/s3.rs (1)
372-392
: Instrumentation is broadly consistent and aligns with PR goals.Per-operation timings, provider/method/status labeling, and files-scanned counters are applied as expected.
Also applies to: 395-444, 446-508, 510-548, 550-629, 631-656, 693-786, 788-821, 925-960, 962-1006, 1026-1050, 1052-1078, 1086-1111, 1179-1241, 1277-1295, 1311-1329
src/metrics/storage.rs (1)
35-45
: Files-scanned counter is a good addition.Labeling by [provider, operation] should make the dashboards for “files touched” straightforward.
// Build all prefixes as relative paths | ||
let prefixes: Vec<_> = prefixes | ||
.into_iter() | ||
.map(|prefix| { | ||
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)) | ||
}) | ||
.collect(); | ||
|
||
// Use storage.list_dirs_relative for all prefixes and flatten results | ||
let mut listing = Vec::new(); | ||
for prefix in prefixes { | ||
let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)); | ||
let prefix = storage.absolute_url(path.as_relative_path()).to_string(); | ||
if let Some(pos) = prefix.rfind("minute") { | ||
let hour_prefix = &prefix[..pos]; | ||
minute_resolve | ||
.entry(hour_prefix.to_owned()) | ||
.or_default() | ||
.push(prefix); | ||
} else { | ||
all_resolve.push(prefix); | ||
match storage.list_dirs_relative(&prefix).await { | ||
Ok(paths) => { | ||
listing.extend(paths.into_iter().map(|p| p.to_string())); | ||
} | ||
Err(e) => { | ||
return Err(DataFusionError::External(Box::new(e))); | ||
} | ||
} | ||
} | ||
|
||
/// Resolve all prefixes asynchronously and collect the object metadata. | ||
type ResolveFuture = | ||
Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send>>; | ||
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new(); | ||
for (listing_prefix, prefixes) in minute_resolve { | ||
let client = Arc::clone(&client); | ||
tasks.push(Box::pin(async move { | ||
let path = Path::from(listing_prefix); | ||
let mut objects = client.list(Some(&path)).try_collect::<Vec<_>>().await?; | ||
|
||
objects.retain(|obj| { | ||
prefixes.iter().any(|prefix| { | ||
obj.location | ||
.prefix_matches(&object_store::path::Path::from(prefix.as_ref())) | ||
}) | ||
}); | ||
|
||
Ok(objects) | ||
})); | ||
} | ||
|
||
for prefix in all_resolve { | ||
let client = Arc::clone(&client); | ||
tasks.push(Box::pin(async move { | ||
client | ||
.list(Some(&object_store::path::Path::from(prefix))) | ||
.try_collect::<Vec<_>>() | ||
.await | ||
})); | ||
} | ||
|
||
let listing = tasks | ||
.try_collect::<Vec<Vec<ObjectMeta>>>() | ||
.await | ||
.map_err(|err| DataFusionError::External(Box::new(err)))? | ||
.into_iter() | ||
.flat_map(|res| { | ||
res.into_iter() | ||
.map(|obj| obj.location.to_string()) | ||
.collect::<Vec<String>>() | ||
}) | ||
.sorted() | ||
.rev() | ||
.collect_vec(); | ||
let listing = listing.into_iter().sorted().rev().collect_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Listing paths lose their parent prefix; produces invalid URIs for query_prefixes.
You collect child names (e.g., “minute=00”) from list_dirs_relative(prefix)
but push them directly into listing
. query_prefixes
then renders gs://bucket/{child}
, missing stream/date=/hour=
. Join the original prefix with the child.
- // Use storage.list_dirs_relative for all prefixes and flatten results
- let mut listing = Vec::new();
- for prefix in prefixes {
- match storage.list_dirs_relative(&prefix).await {
- Ok(paths) => {
- listing.extend(paths.into_iter().map(|p| p.to_string()));
- }
- Err(e) => {
- return Err(DataFusionError::External(Box::new(e)));
- }
- }
- }
+ // Use storage.list_dirs_relative for all prefixes and flatten results,
+ // preserving the full path
+ let mut listing = Vec::new();
+ for prefix in prefixes {
+ let base = prefix.as_str().to_owned();
+ match storage.list_dirs_relative(&prefix).await {
+ Ok(children) => {
+ listing.extend(children.into_iter().map(|c| format!("{}/{}", base, c)));
+ }
+ Err(e) => {
+ return Err(DataFusionError::External(Box::new(e)));
+ }
+ }
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Build all prefixes as relative paths | |
let prefixes: Vec<_> = prefixes | |
.into_iter() | |
.map(|prefix| { | |
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)) | |
}) | |
.collect(); | |
// Use storage.list_dirs_relative for all prefixes and flatten results | |
let mut listing = Vec::new(); | |
for prefix in prefixes { | |
let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)); | |
let prefix = storage.absolute_url(path.as_relative_path()).to_string(); | |
if let Some(pos) = prefix.rfind("minute") { | |
let hour_prefix = &prefix[..pos]; | |
minute_resolve | |
.entry(hour_prefix.to_owned()) | |
.or_default() | |
.push(prefix); | |
} else { | |
all_resolve.push(prefix); | |
match storage.list_dirs_relative(&prefix).await { | |
Ok(paths) => { | |
listing.extend(paths.into_iter().map(|p| p.to_string())); | |
} | |
Err(e) => { | |
return Err(DataFusionError::External(Box::new(e))); | |
} | |
} | |
} | |
/// Resolve all prefixes asynchronously and collect the object metadata. | |
type ResolveFuture = | |
Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send>>; | |
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new(); | |
for (listing_prefix, prefixes) in minute_resolve { | |
let client = Arc::clone(&client); | |
tasks.push(Box::pin(async move { | |
let path = Path::from(listing_prefix); | |
let mut objects = client.list(Some(&path)).try_collect::<Vec<_>>().await?; | |
objects.retain(|obj| { | |
prefixes.iter().any(|prefix| { | |
obj.location | |
.prefix_matches(&object_store::path::Path::from(prefix.as_ref())) | |
}) | |
}); | |
Ok(objects) | |
})); | |
} | |
for prefix in all_resolve { | |
let client = Arc::clone(&client); | |
tasks.push(Box::pin(async move { | |
client | |
.list(Some(&object_store::path::Path::from(prefix))) | |
.try_collect::<Vec<_>>() | |
.await | |
})); | |
} | |
let listing = tasks | |
.try_collect::<Vec<Vec<ObjectMeta>>>() | |
.await | |
.map_err(|err| DataFusionError::External(Box::new(err)))? | |
.into_iter() | |
.flat_map(|res| { | |
res.into_iter() | |
.map(|obj| obj.location.to_string()) | |
.collect::<Vec<String>>() | |
}) | |
.sorted() | |
.rev() | |
.collect_vec(); | |
let listing = listing.into_iter().sorted().rev().collect_vec(); | |
// Build all prefixes as relative paths | |
let prefixes: Vec<_> = prefixes | |
.into_iter() | |
.map(|prefix| { | |
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)) | |
}) | |
.collect(); | |
// Use storage.list_dirs_relative for all prefixes and flatten results, | |
// preserving the full path | |
let mut listing = Vec::new(); | |
for prefix in prefixes { | |
let base = prefix.as_str().to_owned(); | |
match storage.list_dirs_relative(&prefix).await { | |
Ok(children) => { | |
listing.extend(children.into_iter().map(|c| format!("{}/{}", base, c))); | |
} | |
Err(e) => { | |
return Err(DataFusionError::External(Box::new(e))); | |
} | |
} | |
} | |
let listing = listing.into_iter().sorted().rev().collect_vec(); |
🤖 Prompt for AI Agents
In src/query/listing_table_builder.rs around lines 90 to 111, the code currently
pushes only child names returned from storage.list_dirs_relative(&prefix) into
listing, which loses the parent prefix and results in invalid URIs; change the
push to join each child path with its original prefix (preserving relative path
semantics) before adding to listing (e.g., use the prefix to join/concatenate
the child path or call a RelativePathBuf join method), keep error handling
as-is, then sort/reverse as before.
let compressed_size = path | ||
.metadata() | ||
.map(|m| m.len()) | ||
.map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?; | ||
STORAGE_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t fail the upload flow if local metadata read fails.
If metadata()
errors here, the whole upload path fails after the object has already been uploaded, leaving a pushed object without a manifest and the staged file retained. Metrics should be best-effort; log and continue with 0 or move metrics after manifest creation.
- let compressed_size = path
- .metadata()
- .map(|m| m.len())
- .map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?;
+ let compressed_size = match path.metadata().map(|m| m.len()) {
+ Ok(len) => len,
+ Err(e) => {
+ warn!("metadata() failed for {filename}: {e}; skipping size-based metrics");
+ 0
+ }
+ };
Optionally, move metrics after manifest creation and wrap in a non-fatal block.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let compressed_size = path | |
.metadata() | |
.map(|m| m.len()) | |
.map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?; | |
STORAGE_SIZE | |
let compressed_size = match path.metadata().map(|m| m.len()) { | |
Ok(len) => len, | |
Err(e) => { | |
warn!("metadata() failed for {filename}: {e}; skipping size-based metrics"); | |
0 | |
} | |
}; | |
STORAGE_SIZE |
🤖 Prompt for AI Agents
In src/storage/object_storage.rs around lines 150-154, the current call to
path.metadata().map(...).map_err(...) aborts the upload if metadata() fails;
change this so metadata failures are non-fatal: either attempt to read metadata
but on Err log the error and set compressed_size to 0 (or None) and continue, or
move the metrics/metadata computation to after manifest creation and run it in a
non-fatal block that logs errors without returning Err. Ensure you do not
propagate ObjectStorageError for metadata failures so the upload/manifest flow
completes and staged file cleanup still occurs.
6850fb2
to
c88615f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/query/stream_schema_provider.rs (1)
587-596
: Avoid unwrap on ObjectStoreUrl::parse to prevent rare panicsIf storage.store_url() is ever malformed (misconfiguration), unwrap will panic the query path. Map to a DataFusion error instead.
Apply this diff:
- ObjectStoreUrl::parse(storage.store_url()).unwrap(), + ObjectStoreUrl::parse(storage.store_url()) + .map_err(|e| DataFusionError::External(Box::new(e)))?,src/storage/metrics_layer.rs (1)
295-303
: Fix label cardinality bug in StreamMetricWrapper (2 labels supplied to a 3-label histogram).STORAGE_REQUEST_RESPONSE_TIME expects 3 labels: provider, operation, status. The StreamMetricWrapper supplies only ["LIST","200"], causing a runtime label-cardinality panic and losing metrics for streaming list operations.
Refactor wrapper to carry provider and emit 3 labels:
@@ - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> { let time = time::Instant::now(); let inner = self.inner.list(prefix); - let res = StreamMetricWrapper { - time, - labels: ["LIST", "200"], - inner, - }; + let res = StreamMetricWrapper { + time, + provider: &self.provider, + op: "LIST", + status: "200", + inner, + }; Box::pin(res) } @@ - fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, ) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> { let time = time::Instant::now(); let inner = self.inner.list_with_offset(prefix, offset); - let res = StreamMetricWrapper { - time, - labels: ["LIST_OFFSET", "200"], - inner, - }; + let res = StreamMetricWrapper { + time, + provider: &self.provider, + op: "LIST_OFFSET", + status: "200", + inner, + }; Box::pin(res) } @@ -struct StreamMetricWrapper<'a, const N: usize, T> { - time: time::Instant, - labels: [&'static str; N], - inner: BoxStream<'a, T>, -} +struct StreamMetricWrapper<'a, T> { + time: time::Instant, + provider: &'a str, + op: &'static str, + status: &'static str, + inner: BoxStream<'a, T>, +} @@ -impl<T, const N: usize> Stream for StreamMetricWrapper<'_, N, T> { +impl<T> Stream for StreamMetricWrapper<'_, T> { type Item = T; @@ - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&self.labels) - .observe(self.time.elapsed().as_secs_f64()); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[self.provider, self.op, self.status]) + .observe(self.time.elapsed().as_secs_f64()); tAlso applies to: 305-320, 402-426
src/storage/gcs.rs (1)
760-775
: Remove duplicate GET timing and counting from get_objects loop.You already time GETs and increment GET files-scanned inside _get_object(). The loop here re-records GET using the list_start timer and increments GET counts again, inflating numbers and mis-measuring latency.
- STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["gcs", "GET", "200"]) - .observe(list_start.elapsed().as_secs_f64()); - STORAGE_FILES_SCANNED - .with_label_values(&["gcs", "GET"]) - .inc(); - STORAGE_FILES_SCANNED_DATE - .with_label_values(&["gcs", "GET", &Utc::now().date_naive().to_string()]) - .inc();
♻️ Duplicate comments (7)
src/storage/gcs.rs (3)
559-577
: Consistency: count small-file PUTs in multipart path.In the <5MB branch you perform a single PUT but don’t bump files-scanned. Align with other PUT paths.
let put_start = Instant::now(); let result = self.client.put(location, data.into()).await; let put_elapsed = put_start.elapsed().as_secs_f64(); + STORAGE_FILES_SCANNED + .with_label_values(&["gcs", "PUT"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["gcs", "PUT", &Utc::now().date_naive().to_string()]) + .inc();
185-207
: Avoid panic and measure full GET latency (don’t unwrap body, record after bytes()).resp.bytes().await.unwrap() can panic, and you record latency before the body read. Record GET timing after the body is fully read; map body-read errors to status codes.
async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> { - let time = std::time::Instant::now(); - let resp = self.client.get(&to_object_store_path(path)).await; - let elapsed = time.elapsed().as_secs_f64(); + let time = std::time::Instant::now(); + let resp = self.client.get(&to_object_store_path(path)).await; STORAGE_FILES_SCANNED .with_label_values(&["gcs", "GET"]) .inc(); STORAGE_FILES_SCANNED_DATE .with_label_values(&["gcs", "GET", &Utc::now().date_naive().to_string()]) .inc(); match resp { - Ok(resp) => { - let body: Bytes = resp.bytes().await.unwrap(); - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["gcs", "GET", "200"]) - .observe(elapsed); - Ok(body) - } + Ok(resp) => match resp.bytes().await { + Ok(body) => { + let elapsed = time.elapsed().as_secs_f64(); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["gcs", "GET", "200"]) + .observe(elapsed); + Ok(body) + } + Err(err) => { + let elapsed = time.elapsed().as_secs_f64(); + let status_code = error_to_status_code(&err); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["gcs", "GET", status_code]) + .observe(elapsed); + Err(err.into()) + } + }, Err(err) => { - let status_code = error_to_status_code(&err); - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["gcs", "GET", status_code]) - .observe(elapsed); + let elapsed = time.elapsed().as_secs_f64(); + let status_code = error_to_status_code(&err); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["gcs", "GET", status_code]) + .observe(elapsed); Err(err.into()) } } }
256-276
: Count deletes on success only; move increment after successful delete.files_deleted is incremented before attempting the delete; this overcounts when deletes fail. Increment only on Ok(_).
- Ok(obj) => { - files_deleted.fetch_add(1, Ordering::Relaxed); + Ok(obj) => { let delete_start = Instant::now(); let delete_resp = self.client.delete(&obj.location).await; let delete_elapsed = delete_start.elapsed().as_secs_f64(); match delete_resp { Ok(_) => { + files_deleted.fetch_add(1, Ordering::Relaxed); STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["gcs", "DELETE", "200"]) .observe(delete_elapsed);src/storage/azure_blob.rs (3)
594-617
: Consistency: count small-file PUTs in multipart path.Add STORAGE_FILES_SCANNED increments for the single PUT path to match other PUT flows.
let put_start = Instant::now(); let result = self.client.put(location, data.into()).await; let put_elapsed = put_start.elapsed().as_secs_f64(); + STORAGE_FILES_SCANNED + .with_label_values(&["azure_blob", "PUT"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["azure_blob", "PUT", &Utc::now().date_naive().to_string()]) + .inc();
220-244
: Avoid panic and measure full GET latency (don’t unwrap body, record after bytes()).Same issue as GCS: unwrap can panic; also you measure elapsed before reading the body. Record GET timing after bytes() and handle errors.
async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> { - let time = std::time::Instant::now(); - let resp = self.client.get(&to_object_store_path(path)).await; - let elapsed = time.elapsed().as_secs_f64(); + let time = std::time::Instant::now(); + let resp = self.client.get(&to_object_store_path(path)).await; STORAGE_FILES_SCANNED .with_label_values(&["azure_blob", "GET"]) .inc(); STORAGE_FILES_SCANNED_DATE .with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()]) .inc(); - match resp { - Ok(resp) => { - let body: Bytes = resp.bytes().await.unwrap(); - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["azure_blob", "GET", "200"]) - .observe(elapsed); - Ok(body) - } + match resp { + Ok(resp) => match resp.bytes().await { + Ok(body) => { + let elapsed = time.elapsed().as_secs_f64(); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["azure_blob", "GET", "200"]) + .observe(elapsed); + Ok(body) + } + Err(err) => { + let elapsed = time.elapsed().as_secs_f64(); + let status_code = error_to_status_code(&err); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["azure_blob", "GET", status_code]) + .observe(elapsed); + Err(err.into()) + } + }, Err(err) => { - let status_code = error_to_status_code(&err); - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["azure_blob", "GET", status_code]) - .observe(elapsed); + let elapsed = time.elapsed().as_secs_f64(); + let status_code = error_to_status_code(&err); + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["azure_blob", "GET", status_code]) + .observe(elapsed); Err(err.into()) } } }
292-312
: Count deletes on success only; move increment after successful delete.files_deleted is incremented before the delete attempt. Increment only when delete returns Ok(_).
- Ok(obj) => { - files_deleted.fetch_add(1, Ordering::Relaxed); + Ok(obj) => { let delete_start = Instant::now(); let delete_resp = self.client.delete(&obj.location).await; let delete_elapsed = delete_start.elapsed().as_secs_f64(); match delete_resp { Ok(_) => { + files_deleted.fetch_add(1, Ordering::Relaxed); STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["azure_blob", "DELETE", "200"]) .observe(delete_elapsed);src/storage/s3.rs (1)
827-835
: HEAD “files scanned” double-count from previous review is now resolvedIncrement occurs exactly once per HEAD attempt and not again in the Ok branch. This aligns with the guidance in the prior review comment.
Also applies to: 865-874
🧹 Nitpick comments (17)
src/query/stream_schema_provider.rs (7)
327-401
: Avoid over-partitioning when the file list is smallUsing all CPUs for partition fan-out can create a lot of empty file groups when there are only a few files, with minor scheduling overhead. Cap partitions by the number of files while keeping a floor of 1.
Apply this diff:
- let target_partition: usize = num_cpus::get(); + let cpu = num_cpus::get(); + // Avoid many empty partitions when the manifest file list is small + let target_partition: usize = std::cmp::max(1, std::cmp::min(cpu, manifest_files.len()));
474-516
: Prefer preserving the original storage error instead of stringifying itToday get_object_store_format errors are mapped to DataFusionError::Plan with err.to_string(). Consider carrying the source error to keep context (provider, status, etc.) intact.
Apply this diff:
- .await - .map_err(|err| DataFusionError::Plan(err.to_string()))?; + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?;
520-536
: Nit: fix variable name typo for readability (listing_time_fiters → listing_time_filters)Small readability tweak; avoids future confusion.
Apply this diff:
- if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) { - let listing_time_fiters = - return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); + if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) { + let listing_time_filters = + return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); - if let Some(listing_time_filter) = listing_time_fiters { + if let Some(listing_time_filter) = listing_time_filters {
572-583
: Files-scanned counters: confirm intended semantics and avoid potential double-countingIncrementing STORAGE_FILES_SCANNED(_DATE) by the number of parquet files in the plan captures DataFusion-driven GETs that bypass the storage abstraction. That’s useful, but note:
- Planning may overcount if pruning/early-limit prevents opening some files.
- If we later wrap DataFusion’s object_store client for metrics, we may double count.
If the intent is “planned parquet files to scan,” consider documenting that in the metric help text and/or using a distinct method label (e.g., SCAN) to distinguish from storage-layer GET calls. Otherwise, this looks fine.
Would you like me to open a follow-up to align metric help/labels so dashboards don’t mix “planned scans” with “actual GETs”?
87-99
: Optional: Auto-create stream/schema from storage in Query/Prism modeBased on the retrieved learnings from PR #1185, when a table is not present in memory, Query/Prism modes should attempt to create the stream and schema from storage before returning None. This avoids false negatives when the process starts without preloaded stream metadata.
Using the pattern from the learnings:
// Pseudocode inside GlobalSchemaProvider::table(...) if !self.table_exist(name) && (PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism) { if PARSEABLE.create_stream_and_schema_from_storage(name).await.unwrap_or(false) { // proceed to return the table as in the happy path } else { return Ok(None); } }I can send a concrete diff if you confirm create_stream_and_schema_from_storage is available here.
Note: This suggestion leverages the “retrieved learnings” provided for this repository.
495-513
: Query/Prism: merging snapshots from stream.json is fine; consider error/metrics postureYou’re permissive on errors (ignore failures to read/parse stream.json) which favors availability. If you want observability, at minimum log debug/warn with the failing key and error cause. Also, if the storage layer doesn’t already emit metrics for get_objects here, consider relying on it rather than adding counters locally, to avoid duplication.
I can add structured logs here with provider/path labels if you want to keep the “best effort” behavior but make failures visible in traces.
586-596
: Minor: pass time_partition by reference consistentlyA few call sites clone Option; passing as_ref()/cloned() is already mixed above. Not a blocker, just a reminder to keep it consistent for readability.
src/metrics/mod.rs (1)
163-173
: Nit: clarify help text to reflect “global totals across all streams”.Per the PR discussion and retrieved learnings, TOTAL_*_DATE metrics intentionally aggregate across all streams using labels ["format","date"]. To avoid future confusion with per-stream metrics, tweak the help text to explicitly say “across all streams.”
Opts::new( "total_events_ingested_date", - "total events ingested on a particular date", + "Total events ingested across all streams on a particular date", ) ... Opts::new( "total_events_ingested_size_date", - "Total events ingested size in bytes on a particular date", + "Total events ingested size in bytes across all streams on a particular date", ) ... Opts::new( "total_events_storage_size_date", - "Total events storage size in bytes on a particular date", + "Total events storage size in bytes across all streams on a particular date", )Also applies to: 175-185, 187-197
src/storage/metrics_layer.rs (1)
187-201
: Optional: consider counting “files scanned” in the layer for DataFusion code paths.Operations invoked via DataFusion use this MetricLayer, not the provider modules. If you want files-scanned to be complete “for all object store APIs,” add STORAGE_FILES_SCANNED/STORAGE_FILES_SCANNED_DATE increments here as well (mirroring provider modules). If intentional to keep counts only in provider code, feel free to ignore.
Also applies to: 203-217, 219-233, 235-253, 255-269, 271-285, 321-335, 337-351, 353-367, 369-399
src/storage/azure_blob.rs (1)
708-737
: HEAD “files scanned” should count attempts, not only successes.Currently HEAD increments STORAGE_FILES_SCANNED only on Ok. GCS HEAD increments regardless of outcome. For consistency and to match “files-scanned count for all object store APIs,” increment before match.
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> { let head_start = Instant::now(); let result = self.client.head(&to_object_store_path(path)).await; let head_elapsed = head_start.elapsed().as_secs_f64(); - match &result { + // Count the attempted HEAD + STORAGE_FILES_SCANNED + .with_label_values(&["azure_blob", "HEAD"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["azure_blob", "HEAD", &Utc::now().date_naive().to_string()]) + .inc(); + match &result { Ok(_) => { STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["azure_blob", "HEAD", "200"]) .observe(head_elapsed); - // Record single file accessed - STORAGE_FILES_SCANNED - .with_label_values(&["azure_blob", "HEAD"]) - .inc(); - STORAGE_FILES_SCANNED_DATE - .with_label_values(&[ - "azure_blob", - "HEAD", - &Utc::now().date_naive().to_string(), - ]) - .inc(); }src/storage/localfs.rs (2)
404-426
: Optional: count PUT attempts regardless of outcome for consistency.Other providers increment STORAGE_FILES_SCANNED for PUT before the result. Here it’s only on Ok. Consider moving the increment above the match.
let res = fs::write(path, resource).await; let put_elapsed = put_start.elapsed().as_secs_f64(); + // Count attempted PUT + STORAGE_FILES_SCANNED + .with_label_values(&["localfs", "PUT"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["localfs", "PUT", &Utc::now().date_naive().to_string()]) + .inc(); match &res { Ok(_) => { STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["localfs", "PUT", "200"]) .observe(put_elapsed); - // Record single file written successfully - STORAGE_FILES_SCANNED - .with_label_values(&["localfs", "PUT"]) - .inc(); - STORAGE_FILES_SCANNED_DATE - .with_label_values(&["localfs", "PUT", &Utc::now().date_naive().to_string()]) - .inc(); }
838-867
: Avoid blocking the async runtime when copying large files.fs_extra::file::copy is blocking and can stall the Tokio executor under load. Wrap it in spawn_blocking to avoid starving other tasks.
- let result = fs_extra::file::copy(path, to_path, &op); + let to_path_clone = to_path.clone(); + let op_clone = op.clone(); + let result = tokio::task::spawn_blocking(move || { + fs_extra::file::copy(path, to_path_clone, &op_clone) + }) + .await + .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;src/storage/s3.rs (3)
1076-1096
: Count DELETE attempts, not only successes, for consistent “files scanned” semanticsElsewhere (GET/HEAD), you increment
STORAGE_FILES_SCANNED
before knowing the outcome, i.e., per attempt. Here it increments only on success. Move the counters before the match so DELETE aligns with the “attempts” model from the PR objectives.Apply this diff:
- match &result { - Ok(_) => { - STORAGE_REQUEST_RESPONSE_TIME - .with_label_values(&["s3", "DELETE", "200"]) - .observe(delete_elapsed); - // Record single file deleted - STORAGE_FILES_SCANNED - .with_label_values(&["s3", "DELETE"]) - .inc(); - STORAGE_FILES_SCANNED_DATE - .with_label_values(&["s3", "DELETE", &Utc::now().date_naive().to_string()]) - .inc(); - } + // Count the DELETE attempt regardless of outcome + STORAGE_FILES_SCANNED + .with_label_values(&["s3", "DELETE"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["s3", "DELETE", &Utc::now().date_naive().to_string()]) + .inc(); + match &result { + Ok(_) => { + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["s3", "DELETE", "200"]) + .observe(delete_elapsed); + }
694-713
: Multipart uploads: confirm whether files-scanned should be recorded for PUT_MULTIPART and COMPLETESmall-file fallback increments
PUT
counters, but true multipart flows record only latency (no files-scanned). If the intent is “files-scanned count for all object store APIs,” add a single increment forPUT_MULTIPART
initiation and one forPUT_MULTIPART_COMPLETE
(attempts, not per-part).Suggested adjustments:
let async_writer = self.client.put_multipart(location).await; let multipart_elapsed = multipart_start.elapsed().as_secs_f64(); + // Count the multipart initiation attempt + STORAGE_FILES_SCANNED + .with_label_values(&["s3", "PUT_MULTIPART"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["s3", "PUT_MULTIPART", &Utc::now().date_naive().to_string()]) + .inc(); @@ - if let Err(err) = complete_result { + // Count the multipart completion attempt + STORAGE_FILES_SCANNED + .with_label_values(&["s3", "PUT_MULTIPART_COMPLETE"]) + .inc(); + STORAGE_FILES_SCANNED_DATE + .with_label_values(&["s3", "PUT_MULTIPART_COMPLETE", &Utc::now().date_naive().to_string()]) + .inc(); + if let Err(err) = complete_result { let status_code = error_to_status_code(&err); STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&["s3", "PUT_MULTIPART_COMPLETE", status_code]) .observe(complete_elapsed);Note: I’m intentionally not incrementing for each
PUT_MULTIPART_PART
to avoid inflating counts by part numbers. If you do want that, add a single increment per part beside eachput_part
.Also applies to: 797-813
347-350
: Unify Instant usage; prefer the importedInstant
over fully qualifiedstd::time::Instant
Minor consistency/readability nit: the file already imports
time::Instant
. Use it everywhere.Apply this diff:
- let time = std::time::Instant::now(); + let time = Instant::now(); @@ - let time = std::time::Instant::now(); + let time = Instant::now();Also applies to: 379-382
src/metrics/storage.rs (2)
25-32
: Tune histogram buckets for storage latenciesDefault buckets can be skewed for S3-like latencies. Consider custom buckets (e.g., sub-10ms to tens of seconds) to capture both cache hits and cold-path operations better.
Example:
pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| { - HistogramVec::new( - HistogramOpts::new("storage_request_response_time", "Storage Request Latency") - .namespace(METRICS_NAMESPACE), - &["provider", "method", "status"], - ) + { + let opts = HistogramOpts::new("storage_request_response_time", "Storage Request Latency") + .namespace(METRICS_NAMESPACE) + .buckets(vec![ + 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, + 1.0, 2.0, 5.0, 10.0, 20.0, 30.0, + ]); + HistogramVec::new(opts, &["provider", "method", "status"]) + } .expect("metric can be created") });
63-83
: DRY: centralize metric registration to remove duplicated code across providersAll four impls register the same three collectors. Factor this into a helper to reduce duplication and keep future changes in one place.
One way:
pub trait StorageMetrics { fn register_metrics(&self, handler: &PrometheusMetrics); } +fn register_storage_metrics(handler: &PrometheusMetrics) { + handler + .registry + .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + handler + .registry + .register(Box::new(STORAGE_FILES_SCANNED.clone())) + .expect("metric can be registered"); + handler + .registry + .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone())) + .expect("metric can be registered"); +} @@ impl StorageMetrics for FSConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone())) - .expect("metric can be registered"); + register_storage_metrics(handler); } } @@ impl StorageMetrics for S3Config { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone())) - .expect("metric can be registered"); + register_storage_metrics(handler); } } @@ impl StorageMetrics for AzureBlobConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone())) - .expect("metric can be registered"); + register_storage_metrics(handler); } } @@ impl StorageMetrics for GcsConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - handler - .registry - .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED.clone())) - .expect("metric can be registered"); - handler - .registry - .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone())) - .expect("metric can be registered"); + register_storage_metrics(handler); } }Note: Per the maintainer’s note and learnings, only one provider is initialized per process, so there’s no multi-registration panic risk; this change is purely to reduce duplication.
Also applies to: 86-107, 109-130, 132-153
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (14)
src/catalog/mod.rs
(1 hunks)src/metadata.rs
(3 hunks)src/metrics/mod.rs
(6 hunks)src/metrics/storage.rs
(1 hunks)src/query/listing_table_builder.rs
(2 hunks)src/query/mod.rs
(1 hunks)src/query/stream_schema_provider.rs
(12 hunks)src/stats.rs
(3 hunks)src/storage/azure_blob.rs
(24 hunks)src/storage/gcs.rs
(23 hunks)src/storage/localfs.rs
(15 hunks)src/storage/metrics_layer.rs
(8 hunks)src/storage/object_storage.rs
(2 hunks)src/storage/s3.rs
(24 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- src/query/mod.rs
- src/catalog/mod.rs
- src/storage/object_storage.rs
- src/metadata.rs
- src/stats.rs
- src/query/listing_table_builder.rs
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-08-25T01:31:41.768Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.768Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/storage/gcs.rs
src/storage/metrics_layer.rs
src/metrics/mod.rs
src/storage/localfs.rs
src/storage/s3.rs
src/metrics/storage.rs
src/storage/azure_blob.rs
📚 Learning: 2025-08-25T01:32:25.937Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.937Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/storage/gcs.rs
src/metrics/mod.rs
src/storage/localfs.rs
src/metrics/storage.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/storage/gcs.rs
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-08-21T14:41:55.462Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:876-916
Timestamp: 2025-08-21T14:41:55.462Z
Learning: In Parseable's object storage system (src/storage/object_storage.rs), date directories (date=YYYY-MM-DD) are only created when there's actual data to store, which means they will always contain corresponding hour and minute subdirectories. There can be no case where a date directory exists without hour or minute subdirectories.
Applied to files:
src/storage/gcs.rs
src/storage/localfs.rs
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.
Applied to files:
src/storage/gcs.rs
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-08-25T01:33:36.398Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metrics/storage.rs:51-68
Timestamp: 2025-08-25T01:33:36.398Z
Learning: In the Parseable storage system, only one storage provider (localfs, s3, azureblob, or gcs) is meant to be initialized per process, which ensures that global metrics like STORAGE_REQUEST_RESPONSE_TIME and STORAGE_FILES_SCANNED are registered exactly once without risk of duplicate registration panics.
Applied to files:
src/metrics/storage.rs
🧬 Code graph analysis (6)
src/storage/gcs.rs (2)
src/storage/metrics_layer.rs (3)
error_to_status_code
(40-66)new
(75-80)head
(255-269)src/storage/object_storage.rs (3)
parseable_json_path
(1281-1283)new
(82-91)head
(209-209)
src/storage/metrics_layer.rs (3)
src/storage/azure_blob.rs (1)
head
(708-738)src/storage/gcs.rs (1)
head
(695-720)src/storage/s3.rs (3)
head
(865-890)from
(1434-1442)from
(1446-1448)
src/query/stream_schema_provider.rs (3)
src/parseable/mod.rs (4)
storage
(244-246)new
(146-158)serde_json
(300-300)serde_json
(306-306)src/enterprise/utils.rs (1)
collect_manifest_files
(159-182)src/storage/s3.rs (2)
from
(1434-1442)from
(1446-1448)
src/storage/localfs.rs (2)
src/storage/metrics_layer.rs (1)
copy
(337-351)src/storage/azure_blob.rs (3)
get_ingestor_meta_file_paths
(812-850)delete_object
(919-950)check
(952-981)
src/storage/s3.rs (3)
src/storage/metrics_layer.rs (3)
error_to_status_code
(40-66)new
(75-80)head
(255-269)src/storage/localfs.rs (6)
new
(104-106)name
(76-78)from
(985-987)head
(136-150)get_ingestor_meta_file_paths
(190-242)check
(496-520)src/storage/azure_blob.rs (12)
resp
(347-353)resp
(477-483)resp
(1240-1245)resp
(1284-1289)_delete_prefix
(278-333)_list_streams
(335-400)name
(169-171)_list_dates
(402-443)_upload_file
(529-557)head
(708-738)get_ingestor_meta_file_paths
(812-850)check
(952-981)
src/storage/azure_blob.rs (3)
src/storage/metrics_layer.rs (3)
error_to_status_code
(40-66)new
(75-80)head
(255-269)src/storage/object_storage.rs (4)
parseable_json_path
(1281-1283)new
(82-91)name
(200-200)head
(209-209)src/storage/gcs.rs (9)
resp
(312-318)resp
(442-448)resp
(1206-1211)resp
(1246-1251)_delete_prefix
(243-298)_list_streams
(300-365)name
(130-132)_upload_file
(494-522)head
(695-720)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (6)
src/query/stream_schema_provider.rs (3)
19-67
: Imports and storage/metrics wiring look correct
- Moving to PathBuf/RelativePathBuf and the ObjectStorage abstraction is consistent with the PR objectives.
- TryFutureExt is used later (and_then chain), so no unused import.
- Metrics imports (STORAGE_FILES_SCANNED, STORAGE_FILES_SCANNED_DATE) align with the new counters.
284-301
: populate_via_listing now accepts ObjectStorage — good abstraction boundarySwitching ListingTableBuilder::populate_via_listing to take the storage layer rather than a raw object_store keeps the query layer decoupled and lets the storage layer own metrics and provider-specific logic. The TryFutureExt.and_then usage keeps the flow clean.
404-422
: Good: storage plumbed through collect_from_snapshot with proper error mappingPropagating ObjectStorageError via DataFusionError::External is the right choice here; this preserves the original error chain for diagnostics.
src/metrics/mod.rs (2)
268-276
: Registration of TOTAL_ metrics looks correct.*All three TOTAL_*_DATE metrics are registered in the custom registry; no gaps spotted.
127-137
: It looks like there are no remaining.set()
calls on those three metrics—so all callsites have been updated to useinc()
/inc_by()
.src/storage/metrics_layer.rs (1)
37-66
: Status mapping helper and provider label plumbing look solid.The error_to_status_code coverage is pragmatic and the provider field in MetricLayer enables per-provider partitioning without code duplication. No issues here.
pub async fn collect_manifest_files( | ||
storage: Arc<dyn ObjectStore>, | ||
storage: Arc<dyn ObjectStorage>, | ||
manifest_urls: Vec<String>, | ||
) -> Result<Vec<Manifest>, object_store::Error> { | ||
let tasks = manifest_urls.into_iter().map(|path| { | ||
let path = Path::parse(path).unwrap(); | ||
) -> Result<Vec<Manifest>, ObjectStorageError> { | ||
let mut tasks = Vec::new(); | ||
manifest_urls.into_iter().for_each(|path| { | ||
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path"); | ||
let storage = Arc::clone(&storage); | ||
async move { storage.get(&path).await } | ||
tasks.push(tokio::task::spawn(async move { | ||
storage.get_object(&path).await | ||
})); | ||
}); | ||
|
||
let resp = FuturesOrdered::from_iter(tasks) | ||
.and_then(|res| res.bytes()) | ||
.collect::<Vec<object_store::Result<Bytes>>>() | ||
.await; | ||
let mut op = Vec::new(); | ||
for task in tasks { | ||
let file = task.await??; | ||
op.push(file); | ||
} | ||
|
||
Ok(resp | ||
Ok(op | ||
.into_iter() | ||
.flat_map(|res| res.ok()) | ||
.map(|bytes| serde_json::from_slice(&bytes).unwrap()) | ||
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest")) | ||
.collect()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Harden manifest collection: eliminate panics, bound concurrency, keep error context
Current issues:
- Path conversion and JSON deserialization use expect(), which can panic on corrupt manifests or bad paths.
- Spawning an unbounded number of tasks scales poorly with large manifest lists.
- Error context (which path failed) is lost.
Refactor to:
- Bound concurrency with buffer_unordered.
- Map errors into ObjectStorageError without panicking.
- Keep the error chain so provider/status can be surfaced by callers.
Apply this diff:
-pub async fn collect_manifest_files(
- storage: Arc<dyn ObjectStorage>,
- manifest_urls: Vec<String>,
-) -> Result<Vec<Manifest>, ObjectStorageError> {
- let mut tasks = Vec::new();
- manifest_urls.into_iter().for_each(|path| {
- let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
- let storage = Arc::clone(&storage);
- tasks.push(tokio::task::spawn(async move {
- storage.get_object(&path).await
- }));
- });
-
- let mut op = Vec::new();
- for task in tasks {
- let file = task.await??;
- op.push(file);
- }
-
- Ok(op
- .into_iter()
- .map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
- .collect())
-}
+pub async fn collect_manifest_files(
+ storage: Arc<dyn ObjectStorage>,
+ manifest_urls: Vec<String>,
+) -> Result<Vec<Manifest>, ObjectStorageError> {
+ // Bound concurrency to prevent unbounded task fan-out for large manifest sets.
+ let concurrency = std::cmp::min(32, std::cmp::max(1, num_cpus::get() * 2));
+
+ let results = futures_util::stream::iter(manifest_urls.into_iter())
+ .map(|path| {
+ let storage = Arc::clone(&storage);
+ async move {
+ let path = RelativePathBuf::from_path(PathBuf::from(path))
+ .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;
+ storage.get_object(&path).await
+ }
+ })
+ .buffer_unordered(concurrency)
+ .collect::<Vec<_>>()
+ .await;
+
+ let mut manifests = Vec::with_capacity(results.len());
+ for res in results {
+ let bytes = res?;
+ let manifest: Manifest = serde_json::from_slice(&bytes)
+ .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;
+ manifests.push(manifest);
+ }
+ Ok(manifests)
+}
Additionally, add these imports near the other futures imports:
use futures_util::stream::{self, StreamExt};
If you’d like, I can wire in richer error variants (e.g., InvalidPath, InvalidManifest) on ObjectStorageError and convert serde/path errors accordingly, so observability improves without relying on “UnhandledError”.
🤖 Prompt for AI Agents
In src/query/stream_schema_provider.rs around lines 853 to 876, replace the
current unbounded spawn+expect-based implementation: convert each manifest URL
to a Path once (returning a mapped ObjectStorageError instead of panicking on
invalid path), build a futures stream from the path list, use
stream::iter(...).map(|path| async move {
storage.get_object(&path).await.map(|b| (path, b)) }).buffer_unordered(N) to
bound concurrency, collect results while mapping any errors into
ObjectStorageError that include the failing path (preserve the original error as
source), and parse JSON with serde_json::from_slice mapping deserialization
failures into ObjectStorageError with path context instead of expect; also add
the import use futures_util::stream::{self, StreamExt}; to enable
buffer_unordered. Ensure all error mappings carry path info so callers can
surface provider/status.
STORAGE_REQUEST_RESPONSE_TIME | ||
.with_label_values(&["azure_blob", "GET", "200"]) | ||
.observe(list_start.elapsed().as_secs_f64()); | ||
STORAGE_FILES_SCANNED | ||
.with_label_values(&["azure_blob", "GET"]) | ||
.inc(); | ||
STORAGE_FILES_SCANNED_DATE | ||
.with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()]) | ||
.inc(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove duplicate GET timing and counting from get_objects loop.
Same duplication as in GCS: _get_object() already records GET timing and GET files-scanned. The loop uses list_start for GET latency and increments again.
- STORAGE_REQUEST_RESPONSE_TIME
- .with_label_values(&["azure_blob", "GET", "200"])
- .observe(list_start.elapsed().as_secs_f64());
- STORAGE_FILES_SCANNED
- .with_label_values(&["azure_blob", "GET"])
- .inc();
- STORAGE_FILES_SCANNED_DATE
- .with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()])
- .inc();
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["azure_blob", "GET", "200"]) | |
.observe(list_start.elapsed().as_secs_f64()); | |
STORAGE_FILES_SCANNED | |
.with_label_values(&["azure_blob", "GET"]) | |
.inc(); | |
STORAGE_FILES_SCANNED_DATE | |
.with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()]) | |
.inc(); |
🤖 Prompt for AI Agents
In src/storage/azure_blob.rs around lines 784 to 792, the loop is redundantly
recording GET metrics that are already recorded by get_object(); remove the
three metric updates inside this loop (the STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["azure_blob","GET","200"]).observe(...),
STORAGE_FILES_SCANNED .with_label_values(&["azure_blob","GET"]).inc(), and
STORAGE_FILES_SCANNED_DATE
.with_label_values(&["azure_blob","GET",&Utc::now().date_naive().to_string()]).inc())
so that GET timing and files-scanned are only recorded once by get_object();
leave any non-GET or listing metrics intact.
match resp { | ||
Ok(resp) => { | ||
let time = instant.elapsed().as_secs_f64(); | ||
REQUEST_RESPONSE_TIME | ||
.with_label_values(&["GET", "200"]) | ||
.observe(time); | ||
let body = resp.bytes().await.unwrap(); | ||
let body: Bytes = resp.bytes().await.unwrap(); | ||
STORAGE_REQUEST_RESPONSE_TIME | ||
.with_label_values(&["s3", "GET", "200"]) | ||
.observe(elapsed); | ||
Ok(body) | ||
} | ||
Err(err) => { | ||
let time = instant.elapsed().as_secs_f64(); | ||
REQUEST_RESPONSE_TIME | ||
.with_label_values(&["GET", "400"]) | ||
.observe(time); | ||
let status_code = error_to_status_code(&err); | ||
STORAGE_REQUEST_RESPONSE_TIME | ||
.with_label_values(&["s3", "GET", status_code]) | ||
.observe(elapsed); | ||
Err(err.into()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid panic on GET body read; record error status instead of unwrap()
resp.bytes().await.unwrap()
can panic and will also mislabel the request as 200 if the body read fails after a successful GET. Handle the body read error, map it to a status via error_to_status_code
, and return a proper ObjectStorageError
. Also keep the latency observation consistent for both success and failure paths.
Apply this diff:
- Ok(resp) => {
- let body: Bytes = resp.bytes().await.unwrap();
- STORAGE_REQUEST_RESPONSE_TIME
- .with_label_values(&["s3", "GET", "200"])
- .observe(elapsed);
- Ok(body)
- }
+ Ok(resp) => {
+ match resp.bytes().await {
+ Ok(body) => {
+ STORAGE_REQUEST_RESPONSE_TIME
+ .with_label_values(&["s3", "GET", "200"])
+ .observe(elapsed);
+ Ok(body)
+ }
+ Err(err) => {
+ let status_code = error_to_status_code(&err);
+ STORAGE_REQUEST_RESPONSE_TIME
+ .with_label_values(&["s3", "GET", status_code])
+ .observe(elapsed);
+ Err(err.into())
+ }
+ }
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
match resp { | |
Ok(resp) => { | |
let time = instant.elapsed().as_secs_f64(); | |
REQUEST_RESPONSE_TIME | |
.with_label_values(&["GET", "200"]) | |
.observe(time); | |
let body = resp.bytes().await.unwrap(); | |
let body: Bytes = resp.bytes().await.unwrap(); | |
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["s3", "GET", "200"]) | |
.observe(elapsed); | |
Ok(body) | |
} | |
Err(err) => { | |
let time = instant.elapsed().as_secs_f64(); | |
REQUEST_RESPONSE_TIME | |
.with_label_values(&["GET", "400"]) | |
.observe(time); | |
let status_code = error_to_status_code(&err); | |
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["s3", "GET", status_code]) | |
.observe(elapsed); | |
Err(err.into()) | |
} | |
} | |
match resp { | |
Ok(resp) => { | |
match resp.bytes().await { | |
Ok(body) => { | |
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["s3", "GET", "200"]) | |
.observe(elapsed); | |
Ok(body) | |
} | |
Err(err) => { | |
let status_code = error_to_status_code(&err); | |
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["s3", "GET", status_code]) | |
.observe(elapsed); | |
Err(err.into()) | |
} | |
} | |
} | |
Err(err) => { | |
let status_code = error_to_status_code(&err); | |
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["s3", "GET", status_code]) | |
.observe(elapsed); | |
Err(err.into()) | |
} | |
} |
🤖 Prompt for AI Agents
In src/storage/s3.rs around lines 356 to 371, resp.bytes().await.unwrap() may
panic and incorrectly label the request as "200" even when reading the body
fails; change the flow to await resp.bytes().await and match its Result so that
body-read errors are handled: on Ok(body) record latency with label
["s3","GET","200"] and return the body, on Err(e) map the error to a status
string using error_to_status_code(&e), observe the latency with that status,
convert the bytes-read error into the appropriate ObjectStorageError (or map
into the function's error type) and return it instead of panicking; keep the
original resp error handling but ensure both response- and body-read-failure
paths use the same latency observation logic and correct status labeling.
STORAGE_REQUEST_RESPONSE_TIME | ||
.with_label_values(&["s3", "GET", "200"]) | ||
.observe(list_start.elapsed().as_secs_f64()); | ||
STORAGE_FILES_SCANNED | ||
.with_label_values(&["s3", "GET"]) | ||
.inc(); | ||
STORAGE_FILES_SCANNED_DATE | ||
.with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()]) | ||
.inc(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Remove incorrect GET metrics in get_objects() (double count + wrong latency source)
This block double-instruments GETs already covered inside _get_object()
and uses list_start
(LIST latency) for the GET histogram, skewing the data. Rely on _get_object()
for GET metrics and keep LIST metrics scoped to the listing.
Apply this diff to drop the duplicate/incorrect GET metrics:
- STORAGE_REQUEST_RESPONSE_TIME
- .with_label_values(&["s3", "GET", "200"])
- .observe(list_start.elapsed().as_secs_f64());
- STORAGE_FILES_SCANNED
- .with_label_values(&["s3", "GET"])
- .inc();
- STORAGE_FILES_SCANNED_DATE
- .with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()])
- .inc();
To confirm there are no remaining duplicate GET histograms under "s3", run:
🏁 Script executed:
#!/bin/bash
# Find all GET histogram observations under the s3 provider
rg -nP --type=rust -C2 'STORAGE_REQUEST_RESPONSE_TIME.*\["s3",\s*"GET"'
Length of output: 73
🏁 Script executed:
#!/bin/bash
# Display lines around the GET instrumentation in get_objects (approx. lines 936-944)
sed -n '900,960p' src/storage/s3.rs || echo 'Cannot read src/storage/s3.rs'
echo "\n--- Locate _get_object definition and its GET metrics ---"
rg -nP 'fn\s+_get_object' -C5 src/storage/s3.rs || echo 'fn _get_object not found'
rg -nP 'STORAGE_.*with_label_values.*"s3",\s*"GET"' -C3 src/storage/s3.rs || echo 'No GET metrics found in _get_object or elsewhere'
Length of output: 2969
🏁 Script executed:
#!/bin/bash
# Display complete _get_object implementation to verify GET metrics
sed -n '330,420p' src/storage/s3.rs || echo "Failed to read src/storage/s3.rs"
echo -e "\n--- All occurrences of GET labels in s3.rs ---"
rg -n '\["s3",\s*"GET"' -C2 src/storage/s3.rs || echo "No GET label series found"
Length of output: 4786
Remove duplicate GET metrics in get_objects()
The instrumentation block at src/storage/s3.rs:936–944 duplicates the GET metrics already emitted by _get_object()
, resulting in double-counted operations and skewed latency values. Please drop this snippet and rely on the existing _get_object()
instrumentation to capture GET metrics.
• File: src/storage/s3.rs
– Lines 936–944 (inside get_objects()
)
• Reference: _get_object()
at lines 346–365 already records
– STORAGE_FILES_SCANNED
/ STORAGE_FILES_SCANNED_DATE
for GET
– STORAGE_REQUEST_RESPONSE_TIME
for GET (success and error)
Apply this diff to remove the duplicate GET metrics:
- STORAGE_REQUEST_RESPONSE_TIME
- .with_label_values(&["s3", "GET", "200"])
- .observe(list_start.elapsed().as_secs_f64());
- STORAGE_FILES_SCANNED
- .with_label_values(&["s3", "GET"])
- .inc();
- STORAGE_FILES_SCANNED_DATE
- .with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()])
- .inc();
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
STORAGE_REQUEST_RESPONSE_TIME | |
.with_label_values(&["s3", "GET", "200"]) | |
.observe(list_start.elapsed().as_secs_f64()); | |
STORAGE_FILES_SCANNED | |
.with_label_values(&["s3", "GET"]) | |
.inc(); | |
STORAGE_FILES_SCANNED_DATE | |
.with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()]) | |
.inc(); |
🤖 Prompt for AI Agents
In src/storage/s3.rs around lines 936 to 944, remove the instrumentation snippet
that emits STORAGE_REQUEST_RESPONSE_TIME, STORAGE_FILES_SCANNED, and
STORAGE_FILES_SCANNED_DATE for "s3"/"GET" inside get_objects(); these metrics
are already recorded in _get_object() (lines ~346–365) and must not be
duplicated — delete that block and ensure no other duplicate GET metric
emissions remain in get_objects().
Summary by CodeRabbit