Skip to content

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Aug 24, 2025

  1. add files scanned count for all object store apis with provider name and method called
  2. add time elapsed for each api with provider name, method name, and response status code
  3. add total events ingested, ingestion size and storage size for all streams for a day

Summary by CodeRabbit

  • New Features
    • Added global, provider-labeled storage telemetry with operation/status timings and per-operation counters (including multipart steps).
    • Introduced "files scanned" counters and per-date total event/size counters; per-stream/date metrics now accumulate.
  • Refactor
    • Unified storage instrumentation across providers and centralized metric labeling.
    • Query/storage integration simplified: listing/manifest collection now uses a storage abstraction and streamlined listing flow.
  • Bug Fixes
    • Improved daily metric accumulation and stricter error propagation for storage metadata operations.

Copy link
Contributor

coderabbitai bot commented Aug 24, 2025

Walkthrough

Converts several per-date metrics from gauges to counters and adds total-per-date gauges; centralizes storage telemetry into global histograms/counters with provider labels and error-status mapping; instruments storage backends; and refactors query/listing code to use the ObjectStorage abstraction, updating related signatures.

Changes

Cohort / File(s) Summary
Metrics core and metadata
src/metrics/mod.rs, src/metrics/storage.rs, src/metadata.rs, src/stats.rs
Switch per-date event metrics from gauges to counters (IntGaugeVec → IntCounterVec), add TOTAL_EVENTS_*_DATE gauges, register new storage metrics, update call sites to use inc_by/inc/sub/remove_label for totals, and expose new public statics.
Catalog small fix
src/catalog/mod.rs
Adjust metric value reads in extract_partition_metrics to use metric.get() without casting to u64.
ObjectStorage refactor: query layer
src/query/listing_table_builder.rs, src/query/mod.rs, src/query/stream_schema_provider.rs
Remove direct ObjectStore usage, make listing and manifest collection use Arc<dyn ObjectStorage>, simplify populate_via_listing signature and listing flow, and change collect_manifest_files signatures/returns to ObjectStorageError.
Global storage telemetry & metric layer
src/storage/metrics_layer.rs, src/metrics/storage.rs
Introduce global STORAGE_REQUEST_RESPONSE_TIME (HistogramVec), STORAGE_FILES_SCANNED and STORAGE_FILES_SCANNED_DATE (CounterVecs); add error_to_status_code helper; make MetricLayer provider-aware and label metrics with provider/operation/status.
Provider instrumentation (high churn)
src/storage/s3.rs, src/storage/gcs.rs, src/storage/azure_blob.rs, src/storage/localfs.rs, src/storage/object_storage.rs
Replace per-provider histograms with global metrics, add per-operation timing and status labeling, add files-scanned/date counters, add multipart/part timing where relevant, map object_store errors to status codes, and update update_storage_metrics to accumulate per-date totals (TOTAL_EVENTS_STORAGE_SIZE_DATE).
Catalog/stats usage sites
src/stats.rs, src/metadata.rs, src/storage/object_storage.rs
Update metric updates and deletions to use counters/gauges appropriately, accumulate totals across manifests/dates, and subtract totals on deletion processing.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Q as Query Layer
  participant S as ObjectStorage (abstraction)
  participant P as Provider (s3/gcs/azure/localfs)
  participant M as Metrics

  rect rgba(200,220,255,0.25)
    Note over Q,S: Listing & manifest collection (new flow)
    Q->>S: list_dirs_relative(stream/prefix)
    S->>P: LIST
    P-->>S: paths
    S-->>Q: relative paths
    Q->>S: get_object(manifest_path) [concurrent]
    S->>P: GET manifest
    P-->>S: bytes / error
    S->>M: STORAGE_REQUEST_RESPONSE_TIME{provider, "GET", status}
    S->>M: STORAGE_FILES_SCANNED{provider,"GET"} += 1
    Q-->>Q: parse manifests
  end
Loading
sequenceDiagram
  autonumber
  participant App as Stats/Update
  participant M as Metrics Registry

  Note over App,M: Per-date event accounting
  App->>M: EVENTS_INGESTED_DATE.inc_by(n)
  App->>M: EVENTS_INGESTED_SIZE_DATE.inc_by(bytes)
  App->>M: EVENTS_STORAGE_SIZE_DATE.inc_by(bytes)
  App->>M: TOTAL_EVENTS_INGESTED_DATE.add_or_inc(...)   %% total-per-date gauge updated
  App->>M: TOTAL_EVENTS_STORAGE_SIZE_DATE.add_or_inc(...)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • de-sh
  • parmesant

Poem

A rabbit hops through code and logs,
Counters climb where gauges once froze.
Lists and GETs now sing with tags,
Providers shout in labelled flags.
Hop — metrics bloom, observability grows. 🥕

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
src/query/stream_schema_provider.rs (1)

578-585: Fix type mismatch: ObjectStoreUrl::parse expects &str, not url::Url.

This won’t compile. Pass a string slice (or format!) instead.

-            ObjectStoreUrl::parse(storage.store_url()).unwrap(),
+            ObjectStoreUrl::parse(storage.store_url().as_str()).unwrap(),
src/storage/gcs.rs (1)

707-756: Duplicate GET timing and counting inside get_objects; rely on _get_object metrics.

self.get_object(...) already records GET timing and counts. The subsequent REQUEST_RESPONSE_TIME GET observation and FILES_SCANNED GET inc here double-count and the timing uses the LIST stopwatch, not the GET duration.

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["gcs", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["gcs", "GET"])
-                .inc();
src/storage/metrics_layer.rs (1)

294-303: Fix label cardinality mismatch in StreamMetricWrapper (will panic at runtime).

STORAGE_REQUEST_RESPONSE_TIME expects 3 labels [provider, method, status], but StreamMetricWrapper supplies only ["LIST","200"]. This will trigger a panic on observe. Also, the wrapper stores &'static str labels, preventing passing the dynamic provider string.

Refactor the wrapper to carry provider and emit all three labels.

Apply this diff:

@@
-    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
         let time = time::Instant::now();
         let inner = self.inner.list(prefix);
-        let res = StreamMetricWrapper {
-            time,
-            labels: ["LIST", "200"],
-            inner,
-        };
+        let res = StreamMetricWrapper {
+            time,
+            provider: &self.provider,
+            method: "LIST",
+            status: "200",
+            inner,
+        };
         Box::pin(res)
     }
@@
-    fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
     ) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
         let time = time::Instant::now();
         let inner = self.inner.list_with_offset(prefix, offset);
-        let res = StreamMetricWrapper {
-            time,
-            labels: ["LIST_OFFSET", "200"],
-            inner,
-        };
+        let res = StreamMetricWrapper {
+            time,
+            provider: &self.provider,
+            method: "LIST_OFFSET",
+            status: "200",
+            inner,
+        };
 
         Box::pin(res)
     }
@@
-struct StreamMetricWrapper<'a, const N: usize, T> {
+struct StreamMetricWrapper<'a, T> {
     time: time::Instant,
-    labels: [&'static str; N],
+    provider: &'a str,
+    method: &'static str,
+    status: &'static str,
     inner: BoxStream<'a, T>,
 }
 
-impl<T, const N: usize> Stream for StreamMetricWrapper<'_, N, T> {
+impl<T> Stream for StreamMetricWrapper<'_, T> {
     type Item = T;
@@
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&self.labels)
-                    .observe(self.time.elapsed().as_secs_f64());
+                STORAGE_REQUEST_RESPONSE_TIME
+                    .with_label_values(&[self.provider, self.method, self.status])
+                    .observe(self.time.elapsed().as_secs_f64());
                 t

Also applies to: 310-319, 402-426

src/storage/azure_blob.rs (1)

763-812: Remove duplicate and inaccurate GET metrics inside get_objects().

Each self.get_object() already records GET metrics. Additionally, using list_start to time GET inflates durations with the loop’s total runtime.

Apply this diff:

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["azure_blob", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["azure_blob", "GET"])
-                .inc();

Also applies to: 792-799

src/storage/s3.rs (2)

343-365: Avoid panic on GET: handle body read errors and measure full GET latency.

Same issue as Azure: unwrap on resp.bytes() can panic and elapsed is computed before reading the body.

Apply this diff:

     async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
-        let time = std::time::Instant::now();
-        let resp = self.client.get(&to_object_store_path(path)).await;
-        let elapsed = time.elapsed().as_secs_f64();
+        let time = std::time::Instant::now();
+        let resp = self.client.get(&to_object_store_path(path)).await;
         STORAGE_FILES_SCANNED
             .with_label_values(&["s3", "GET"])
             .inc();
-        match resp {
-            Ok(resp) => {
-                let body: Bytes = resp.bytes().await.unwrap();
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["s3", "GET", "200"])
-                    .observe(elapsed);
-                Ok(body)
-            }
-            Err(err) => {
-                let status_code = error_to_status_code(&err);
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["s3", "GET", status_code])
-                    .observe(elapsed);
-                Err(err.into())
-            }
-        }
+        match resp {
+            Ok(resp) => match resp.bytes().await {
+                Ok(body) => {
+                    let elapsed = time.elapsed().as_secs_f64();
+                    STORAGE_REQUEST_RESPONSE_TIME
+                        .with_label_values(&["s3", "GET", "200"])
+                        .observe(elapsed);
+                    Ok(body)
+                }
+                Err(err) => {
+                    let elapsed = time.elapsed().as_secs_f64();
+                    let status_code = error_to_status_code(&err);
+                    STORAGE_REQUEST_RESPONSE_TIME
+                        .with_label_values(&["s3", "GET", status_code])
+                        .observe(elapsed);
+                    Err(err.into())
+                }
+            },
+            Err(err) => {
+                let elapsed = time.elapsed().as_secs_f64();
+                let status_code = error_to_status_code(&err);
+                STORAGE_REQUEST_RESPONSE_TIME
+                    .with_label_values(&["s3", "GET", status_code])
+                    .observe(elapsed);
+                Err(err.into())
+            }
+        }
     }

874-923: Remove duplicate and inaccurate GET metrics inside get_objects().

Each self.get_object() already records GET metrics. Using list_start for GET inflates durations.

Apply this diff:

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["s3", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["s3", "GET"])
-                .inc();

Also applies to: 903-909

🧹 Nitpick comments (18)
src/catalog/mod.rs (1)

190-206: Result.map(...).unwrap_or(0) on counter get(): LGTM, but consider unwrap_or_default().

  • Using .map(|m| m.get()).unwrap_or(0) over the prometheus Result is concise and correct with IntCounterVec (u64).
  • Minor nit: unwrap_or_default() reads slightly clearer and avoids literal zeros.

No functional issues.

src/stats.rs (1)

205-221: delete_with_label_prefix now targets IntCounterVec: LGTM, but be aware of cost.

Iterating MetricFamily and removing by exact label map is correct and avoids guessing dates. This is O(N labels) and fine for admin paths; keep it off hot paths.

src/storage/object_storage.rs (1)

148-150: Filename parsing assumes dot-separated convention; consider a defensive fallback (context-aware).

This split/indexing will panic if the filename deviates. Based on the retrieved learning (Parseable’s server guarantees the date=YYYY-MM-DD.hour=HH.minute=MM pattern), this is safe in normal flows. Still, a debug assert or graceful fallback would make this more robust during migrations or manual uploads.

-    let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0];
-    file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1];
+    let mut file_date_part = filename.split('.').next().unwrap_or(filename);
+    file_date_part = file_date_part
+        .split('=')
+        .nth(1)
+        .unwrap_or_else(|| {
+            debug_assert!(false, "Unexpected parquet filename format: {filename}");
+            "unknown-date"
+        });
src/query/listing_table_builder.rs (1)

98-109: Best-effort listing instead of hard fail (optional).

A transient failure on one prefix aborts the whole listing. Consider logging and continuing, so older segments still get scanned.

-                Err(e) => {
-                    return Err(DataFusionError::External(Box::new(e)));
-                }
+                Err(e) => {
+                    tracing::warn!("list_dirs_relative failed for {}: {e}", prefix);
+                    continue;
+                }
src/query/stream_schema_provider.rs (2)

569-573: Metric semantics: “files scanned” vs planned parquet count.

You increment STORAGE_FILES_SCANNED[.., "GET"] by the number of parquet files from manifests. These are “planned to scan,” not actual GET/HEADs (which DataFusion will perform later). Either move this count where I/O happens or rename/use a planning metric to avoid inflating GET counts.


843-866: Duplicate manifest collection logic; unify to one helper.

This is nearly identical to enterprise’s collect_manifest_files (see src/enterprise/utils.rs). Consider centralizing it to avoid divergence in error handling and performance.

src/storage/gcs.rs (2)

665-690: HEAD double-count: STORAGE_FILES_SCANNED incremented twice.

You inc() at Lines 669-671 and again at 677-679 within the Ok branch. Keep only one increment per HEAD to avoid inflation.

-                // Record single file accessed
-                STORAGE_FILES_SCANNED
-                    .with_label_values(&["gcs", "HEAD"])
-                    .inc();

348-386: LIST metrics only capture stream creation time, not stream consumption (optional).

You observe LIST timing immediately after creating the stream. For truer latency, consider measuring from start to end of consumption or supplement with per-chunk timings (you already do that for GET/DELETE). Not critical if consistency across providers is the goal.

src/storage/localfs.rs (3)

133-137: Consider not incrementing HEAD for an unimplemented operation.

Incrementing “files scanned” for HEAD even though it immediately errors skews counts. Either remove it here or add a dedicated “UNSUPPORTED” status dimension (if you want to track attempts).

-        STORAGE_FILES_SCANNED
-            .with_label_values(&["localfs", "HEAD"])
-            .inc();

304-373: Streaming LIST/GET accounting is consistent; minor nit on 404 mapping.

This path reports LIST 200 when the directory exists and per-file GET 200/404 as appropriate. If directory doesn’t exist you surface LIST 404 and return early—fine for now. Consider treating missing dir as empty listing to be more permissive, but not required.


416-440: DELETE prefix: observe status but no “files scanned” increment (OK).

Given this removes a directory, not a file object, omitting a DELETE counter increment makes sense. If you want parity with object-store providers, you could increment by the number of entries removed (requires walking), but that’s optional.

src/storage/metrics_layer.rs (2)

39-66: Broaden and correct error-to-status mapping.

  • Mapping Generic to 400 likely misclassifies server/transport failures as client errors; 500 is safer.
  • Consider handling other common variants (e.g., InvalidPath -> 400, InvalidRange -> 416, Timeout/DeadlineExceeded -> 504/408) for better fidelity.

Apply this diff (adjust variants per object_store version you use):

 pub fn error_to_status_code(err: &object_store::Error) -> &'static str {
     match err {
-        // 400 Bad Request - Client errors
-        object_store::Error::Generic { .. } => "400",
+        // 400 Bad Request - obvious client errors
+        object_store::Error::InvalidPath { .. } => "400",
+        // 416 Range Not Satisfiable
+        object_store::Error::InvalidRange { .. } => "416",
+        // Treat generic/unknown as server-side
+        object_store::Error::Generic { .. } => "500",
         // 401 Unauthorized - Authentication required
         object_store::Error::Unauthenticated { .. } => "401",
         // 404 Not Found - Resource doesn't exist
         object_store::Error::NotFound { .. } => "404",
         // 409 Conflict - Resource already exists
         object_store::Error::AlreadyExists { .. } => "409",
         // 412 Precondition Failed - If-Match, If-None-Match, etc. failed
         object_store::Error::Precondition { .. } => "412",
         // 304 Not Modified
         object_store::Error::NotModified { .. } => "304",
         // 501 Not Implemented - Feature not supported
         object_store::Error::NotSupported { .. } => "501",
+        // 504 Gateway Timeout (or choose 408 Request Timeout)
+        object_store::Error::Timeout { .. } => "504",
         // 500 Internal Server Error - All other errors
         _ => "500",
     }
 }

If the exact variant names differ in your object_store version, I can tailor this.


369-399: Standardize method labels for conditional ops.

Use operation names that match method names to ease querying and dashboards.

Apply this diff:

-            .with_label_values(&[&self.provider, "COPY_IF", status])
+            .with_label_values(&[&self.provider, "COPY_IF_NOT_EXISTS", status])
@@
-            .with_label_values(&[&self.provider, "RENAME_IF", status])
+            .with_label_values(&[&self.provider, "RENAME_IF_NOT_EXISTS", status])
src/storage/azure_blob.rs (2)

723-746: HEAD files-scanned should increment regardless of success.

Other paths (e.g., check) count attempts, not just successes. Increment once per HEAD call to keep semantics consistent.

Apply this diff:

-        match &result {
+        match &result {
             Ok(_) => {
                 STORAGE_REQUEST_RESPONSE_TIME
                     .with_label_values(&["azure_blob", "HEAD", "200"])
                     .observe(head_elapsed);
-                // Record single file accessed
-                STORAGE_FILES_SCANNED
-                    .with_label_values(&["azure_blob", "HEAD"])
-                    .inc();
             }
             Err(err) => {
                 let status_code = error_to_status_code(err);
                 STORAGE_REQUEST_RESPONSE_TIME
                     .with_label_values(&["azure_blob", "HEAD", status_code])
                     .observe(head_elapsed);
             }
         }
 
-        Ok(result?)
+        // Count the attempt once
+        STORAGE_FILES_SCANNED
+            .with_label_values(&["azure_blob", "HEAD"])
+            .inc();
+        Ok(result?)

1006-1056: Consider counting HEAD attempts in list_old_streams() checks.

You record HEAD latency per stream but don’t increment STORAGE_FILES_SCANNED for HEAD. Add a single increment per attempt inside the task.

Apply this diff:

             let task = async move {
                 let head_start = Instant::now();
                 let result = self.client.head(&StorePath::from(key)).await;
                 let head_elapsed = head_start.elapsed().as_secs_f64();
 
                 match &result {
                     Ok(_) => {
                         STORAGE_REQUEST_RESPONSE_TIME
                             .with_label_values(&["azure_blob", "HEAD", "200"])
                             .observe(head_elapsed);
                     }
                     Err(err) => {
                         let status_code = error_to_status_code(err);
                         STORAGE_REQUEST_RESPONSE_TIME
                             .with_label_values(&["azure_blob", "HEAD", status_code])
                             .observe(head_elapsed);
                     }
                 }
+                STORAGE_FILES_SCANNED
+                    .with_label_values(&["azure_blob", "HEAD"])
+                    .inc();
 
                 result.map(|_| ())
             };
src/storage/s3.rs (2)

666-686: Standardize multipart INIT label to match Azure/GCS.

Use PUT_MULTIPART_INIT for the initiation step for consistency across providers.

Apply this diff:

-        let mut async_writer = match async_writer {
+        let mut async_writer = match async_writer {
             Ok(writer) => {
                 STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["s3", "PUT_MULTIPART", "200"])
+                    .with_label_values(&["s3", "PUT_MULTIPART_INIT", "200"])
                     .observe(multipart_elapsed);
                 writer
             }
             Err(err) => {
                 let status_code = error_to_status_code(&err);
                 STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["s3", "PUT_MULTIPART", status_code])
+                    .with_label_values(&["s3", "PUT_MULTIPART_INIT", status_code])
                     .observe(multipart_elapsed);
                 return Err(err.into());
             }
         };

Also applies to: 672-676


1117-1167: Consider counting HEAD attempts in list_old_streams() checks.

Latency is recorded per HEAD, but files-scanned is not incremented.

Apply this diff:

             let task = async move {
                 let head_start = Instant::now();
                 let result = self.client.head(&StorePath::from(key)).await;
                 let head_elapsed = head_start.elapsed().as_secs_f64();
@@
                 }
-                result.map(|_| ())
+                STORAGE_FILES_SCANNED
+                    .with_label_values(&["s3", "HEAD"])
+                    .inc();
+                result.map(|_| ())
             };

Also applies to: 1140-1159

src/metrics/storage.rs (1)

25-33: Global histogram looks good; consider explicit buckets.

Default buckets may not fit object-store latencies. Consider Prometheus-style buckets for sub-ms to tens-of-seconds.

Apply this diff:

 pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
-    HistogramVec::new(
-        HistogramOpts::new("storage_request_response_time", "Storage Request Latency")
-            .namespace(METRICS_NAMESPACE),
-        &["provider", "method", "status"],
-    )
+    HistogramVec::new(
+        HistogramOpts::new("storage_request_response_time", "Storage Request Latency")
+            .namespace(METRICS_NAMESPACE)
+            .buckets(vec![
+                0.005, 0.01, 0.025, 0.05, 0.1,
+                0.25, 0.5, 1.0, 2.5, 5.0,
+                10.0, 30.0, 60.0,
+            ]),
+        &["provider", "method", "status"],
+    )
     .expect("metric can be created")
 });
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0e35b07 and 6850fb2.

📒 Files selected for processing (14)
  • src/catalog/mod.rs (1 hunks)
  • src/metadata.rs (3 hunks)
  • src/metrics/mod.rs (6 hunks)
  • src/metrics/storage.rs (1 hunks)
  • src/query/listing_table_builder.rs (2 hunks)
  • src/query/mod.rs (1 hunks)
  • src/query/stream_schema_provider.rs (12 hunks)
  • src/stats.rs (3 hunks)
  • src/storage/azure_blob.rs (24 hunks)
  • src/storage/gcs.rs (24 hunks)
  • src/storage/localfs.rs (15 hunks)
  • src/storage/metrics_layer.rs (8 hunks)
  • src/storage/object_storage.rs (2 hunks)
  • src/storage/s3.rs (24 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-08-20T17:01:25.791Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1409
File: src/storage/field_stats.rs:429-456
Timestamp: 2025-08-20T17:01:25.791Z
Learning: In Parseable's field stats calculation (src/storage/field_stats.rs), the extract_datetime_from_parquet_path_regex function correctly works with filename-only parsing because Parseable's server-side filename generation guarantees the dot-separated format date=YYYY-MM-DD.hour=HH.minute=MM pattern in parquet filenames.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/azure_blob.rs
  • src/storage/s3.rs
📚 Learning: 2025-08-21T14:41:55.462Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:876-916
Timestamp: 2025-08-21T14:41:55.462Z
Learning: In Parseable's object storage system (src/storage/object_storage.rs), date directories (date=YYYY-MM-DD) are only created when there's actual data to store, which means they will always contain corresponding hour and minute subdirectories. There can be no case where a date directory exists without hour or minute subdirectories.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/azure_blob.rs
  • src/storage/localfs.rs
  • src/storage/s3.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/azure_blob.rs
  • src/storage/s3.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/query/stream_schema_provider.rs
🧬 Code graph analysis (9)
src/catalog/mod.rs (1)
src/stats.rs (1)
  • event_labels (223-225)
src/metadata.rs (1)
src/catalog/mod.rs (4)
  • num_rows (61-61)
  • num_rows (78-80)
  • ingestion_size (58-58)
  • ingestion_size (70-72)
src/storage/gcs.rs (3)
src/storage/metrics_layer.rs (3)
  • error_to_status_code (40-66)
  • new (75-80)
  • head (255-269)
src/storage/object_storage.rs (3)
  • parseable_json_path (1281-1283)
  • new (82-91)
  • head (209-209)
src/storage/s3.rs (3)
  • from (1345-1353)
  • from (1357-1359)
  • head (831-857)
src/storage/azure_blob.rs (2)
src/storage/metrics_layer.rs (3)
  • error_to_status_code (40-66)
  • new (75-80)
  • head (255-269)
src/storage/gcs.rs (12)
  • resp (296-302)
  • resp (420-426)
  • resp (1130-1135)
  • resp (1164-1169)
  • _delete_prefix (233-282)
  • _list_streams (284-346)
  • name (126-128)
  • _list_dates (348-386)
  • _upload_file (469-494)
  • head (664-690)
  • get_ingestor_meta_file_paths (758-793)
  • check (885-911)
src/storage/localfs.rs (5)
src/storage/object_storage.rs (7)
  • get_ingestor_meta_file_paths (339-341)
  • delete_prefix (227-227)
  • delete_object (338-338)
  • check (228-228)
  • delete_stream (229-229)
  • try_delete_node_meta (346-346)
  • list_streams (230-230)
src/storage/metrics_layer.rs (2)
  • delete_stream (287-292)
  • copy (337-351)
src/storage/azure_blob.rs (7)
  • get_ingestor_meta_file_paths (814-849)
  • delete_prefix (909-913)
  • delete_object (915-939)
  • check (941-967)
  • delete_stream (969-973)
  • try_delete_node_meta (975-1000)
  • list_streams (1002-1004)
src/storage/gcs.rs (7)
  • get_ingestor_meta_file_paths (758-793)
  • delete_prefix (853-857)
  • delete_object (859-883)
  • check (885-911)
  • delete_stream (913-917)
  • try_delete_node_meta (919-944)
  • list_streams (946-948)
src/storage/s3.rs (9)
  • get_ingestor_meta_file_paths (925-960)
  • from (1345-1353)
  • from (1357-1359)
  • delete_prefix (1020-1024)
  • delete_object (1026-1050)
  • check (1052-1078)
  • delete_stream (1080-1084)
  • try_delete_node_meta (1086-1111)
  • list_streams (1113-1115)
src/storage/s3.rs (3)
src/storage/metrics_layer.rs (3)
  • error_to_status_code (40-66)
  • new (75-80)
  • head (255-269)
src/storage/object_storage.rs (4)
  • parseable_json_path (1281-1283)
  • new (82-91)
  • name (200-200)
  • head (209-209)
src/storage/azure_blob.rs (8)
  • resp (331-337)
  • resp (455-461)
  • resp (1190-1195)
  • resp (1224-1229)
  • _delete_prefix (268-317)
  • _list_streams (319-381)
  • name (165-167)
  • head (723-746)
src/metrics/storage.rs (1)
src/storage/metrics_layer.rs (1)
  • new (75-80)
src/storage/metrics_layer.rs (3)
src/storage/azure_blob.rs (1)
  • head (723-746)
src/storage/gcs.rs (1)
  • head (664-690)
src/storage/s3.rs (1)
  • head (831-857)
src/query/stream_schema_provider.rs (3)
src/parseable/mod.rs (4)
  • storage (244-246)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/enterprise/utils.rs (1)
  • collect_manifest_files (159-182)
src/storage/s3.rs (2)
  • from (1345-1353)
  • from (1357-1359)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (17)
src/metrics/mod.rs (3)

268-276: Registry updates for new totals: LGTM.

The new TOTAL_*_DATE metrics are registered. Once labels include stream (see prior comment), this remains correct.


33-33: Help text clarifications: LGTM.

Descriptions now explicitly say “for a stream”, improving clarity and aligning with label cardinality.

Also applies to: 41-46, 53-53, 61-61, 69-75, 83-83, 93-97, 107-107, 119-119


127-137: The script will print the relevant section for review. Please share the output so we can verify whether the catalog module correctly casts metrics to u64.

src/query/mod.rs (1)

555-566: The script will dump the collect_manifest_files implementation so we can verify how it handles absolute URLs.

src/stats.rs (1)

25-25: Imports for counters and TOTAL_ metrics: LGTM.*

Prepares this module to manage per-date counters and totals.

Also applies to: 32-34

src/metadata.rs (1)

49-57: Counters: correct shift to inc_by for per-date metrics.

  • update_stats and load_daily_metrics now use inc_by(...) for per-date counters. This matches the new metric types and avoids set(...) races.

One caution: load_daily_metrics should be called only once per process start (or after clearing/removing label series) to avoid double accumulation.

Provide the call chain where load_daily_metrics is invoked to ensure idempotent usage.

Also applies to: 181-189

src/storage/object_storage.rs (1)

158-166: Metric type consistency: counter vs gauge.

EVENTS_STORAGE_SIZE_DATE switched to inc_by (counter), while STORAGE_SIZE/LIFETIME_EVENTS_STORAGE_SIZE still use .add() (gauge). If that’s intentional (lifetime and current gauges vs per-date counters), all good. Please confirm EVENTS_STORAGE_SIZE_DATE is now an IntCounterVec or this won’t compile.

src/query/stream_schema_provider.rs (1)

281-289: API shift to storage-only listing looks good.

The migration to storage-based listing and removal of the ObjectStore dependency keeps concerns localized. No issues spotted here.

src/storage/localfs.rs (3)

148-176: GET instrumentation looks solid; status mapping covers 200/404/500.

Good balance between observability and behavior: no panics, and single-file GET increments are accurate.


389-408: PUT metrics and single-file count are correct.

Instrumentation patterns align with the rest of the providers.


557-602: list_streams observability matches provider patterns; LGTM.

The LIST status timing and error-to-status mapping are consistent. No issues spotted.

src/storage/metrics_layer.rs (2)

71-80: Provider label addition looks good.

Storing provider on MetricLayer and extending new(inner, provider) is the right move for consistent tagging.


96-110: Instrumentation approach is clean and consistent.

Measuring elapsed time per call, mapping to status, and returning inner results directly keeps behavior unchanged while improving observability.

Also applies to: 118-131, 154-167, 172-185, 187-201, 203-217, 219-233, 235-253, 255-269, 271-285, 321-335, 337-351, 353-367, 369-383, 385-399

src/storage/azure_blob.rs (2)

245-265: PUT metrics and counting look good.

Timing and status labeling are correct and the files-scanned counter is incremented once per PUT attempt.


267-316: Instrumentation and counters look consistent across operations.

Timing + provider/method/status labels are applied consistently; per-operation files-scanned counters are incremented appropriately.

Also applies to: 319-381, 383-421, 423-502, 504-529, 915-939, 941-967, 977-999, 1069-1095, 1103-1129, 1169-1235

src/storage/s3.rs (1)

372-392: Instrumentation is broadly consistent and aligns with PR goals.

Per-operation timings, provider/method/status labeling, and files-scanned counters are applied as expected.

Also applies to: 395-444, 446-508, 510-548, 550-629, 631-656, 693-786, 788-821, 925-960, 962-1006, 1026-1050, 1052-1078, 1086-1111, 1179-1241, 1277-1295, 1311-1329

src/metrics/storage.rs (1)

35-45: Files-scanned counter is a good addition.

Labeling by [provider, operation] should make the dashboards for “files touched” straightforward.

Comment on lines +90 to +111
// Build all prefixes as relative paths
let prefixes: Vec<_> = prefixes
.into_iter()
.map(|prefix| {
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix))
})
.collect();

// Use storage.list_dirs_relative for all prefixes and flatten results
let mut listing = Vec::new();
for prefix in prefixes {
let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix));
let prefix = storage.absolute_url(path.as_relative_path()).to_string();
if let Some(pos) = prefix.rfind("minute") {
let hour_prefix = &prefix[..pos];
minute_resolve
.entry(hour_prefix.to_owned())
.or_default()
.push(prefix);
} else {
all_resolve.push(prefix);
match storage.list_dirs_relative(&prefix).await {
Ok(paths) => {
listing.extend(paths.into_iter().map(|p| p.to_string()));
}
Err(e) => {
return Err(DataFusionError::External(Box::new(e)));
}
}
}

/// Resolve all prefixes asynchronously and collect the object metadata.
type ResolveFuture =
Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send>>;
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();
for (listing_prefix, prefixes) in minute_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
let path = Path::from(listing_prefix);
let mut objects = client.list(Some(&path)).try_collect::<Vec<_>>().await?;

objects.retain(|obj| {
prefixes.iter().any(|prefix| {
obj.location
.prefix_matches(&object_store::path::Path::from(prefix.as_ref()))
})
});

Ok(objects)
}));
}

for prefix in all_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
client
.list(Some(&object_store::path::Path::from(prefix)))
.try_collect::<Vec<_>>()
.await
}));
}

let listing = tasks
.try_collect::<Vec<Vec<ObjectMeta>>>()
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?
.into_iter()
.flat_map(|res| {
res.into_iter()
.map(|obj| obj.location.to_string())
.collect::<Vec<String>>()
})
.sorted()
.rev()
.collect_vec();
let listing = listing.into_iter().sorted().rev().collect_vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Listing paths lose their parent prefix; produces invalid URIs for query_prefixes.

You collect child names (e.g., “minute=00”) from list_dirs_relative(prefix) but push them directly into listing. query_prefixes then renders gs://bucket/{child}, missing stream/date=/hour=. Join the original prefix with the child.

-        // Use storage.list_dirs_relative for all prefixes and flatten results
-        let mut listing = Vec::new();
-        for prefix in prefixes {
-            match storage.list_dirs_relative(&prefix).await {
-                Ok(paths) => {
-                    listing.extend(paths.into_iter().map(|p| p.to_string()));
-                }
-                Err(e) => {
-                    return Err(DataFusionError::External(Box::new(e)));
-                }
-            }
-        }
+        // Use storage.list_dirs_relative for all prefixes and flatten results,
+        // preserving the full path
+        let mut listing = Vec::new();
+        for prefix in prefixes {
+            let base = prefix.as_str().to_owned();
+            match storage.list_dirs_relative(&prefix).await {
+                Ok(children) => {
+                    listing.extend(children.into_iter().map(|c| format!("{}/{}", base, c)));
+                }
+                Err(e) => {
+                    return Err(DataFusionError::External(Box::new(e)));
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Build all prefixes as relative paths
let prefixes: Vec<_> = prefixes
.into_iter()
.map(|prefix| {
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix))
})
.collect();
// Use storage.list_dirs_relative for all prefixes and flatten results
let mut listing = Vec::new();
for prefix in prefixes {
let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix));
let prefix = storage.absolute_url(path.as_relative_path()).to_string();
if let Some(pos) = prefix.rfind("minute") {
let hour_prefix = &prefix[..pos];
minute_resolve
.entry(hour_prefix.to_owned())
.or_default()
.push(prefix);
} else {
all_resolve.push(prefix);
match storage.list_dirs_relative(&prefix).await {
Ok(paths) => {
listing.extend(paths.into_iter().map(|p| p.to_string()));
}
Err(e) => {
return Err(DataFusionError::External(Box::new(e)));
}
}
}
/// Resolve all prefixes asynchronously and collect the object metadata.
type ResolveFuture =
Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send>>;
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();
for (listing_prefix, prefixes) in minute_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
let path = Path::from(listing_prefix);
let mut objects = client.list(Some(&path)).try_collect::<Vec<_>>().await?;
objects.retain(|obj| {
prefixes.iter().any(|prefix| {
obj.location
.prefix_matches(&object_store::path::Path::from(prefix.as_ref()))
})
});
Ok(objects)
}));
}
for prefix in all_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
client
.list(Some(&object_store::path::Path::from(prefix)))
.try_collect::<Vec<_>>()
.await
}));
}
let listing = tasks
.try_collect::<Vec<Vec<ObjectMeta>>>()
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?
.into_iter()
.flat_map(|res| {
res.into_iter()
.map(|obj| obj.location.to_string())
.collect::<Vec<String>>()
})
.sorted()
.rev()
.collect_vec();
let listing = listing.into_iter().sorted().rev().collect_vec();
// Build all prefixes as relative paths
let prefixes: Vec<_> = prefixes
.into_iter()
.map(|prefix| {
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix))
})
.collect();
// Use storage.list_dirs_relative for all prefixes and flatten results,
// preserving the full path
let mut listing = Vec::new();
for prefix in prefixes {
let base = prefix.as_str().to_owned();
match storage.list_dirs_relative(&prefix).await {
Ok(children) => {
listing.extend(children.into_iter().map(|c| format!("{}/{}", base, c)));
}
Err(e) => {
return Err(DataFusionError::External(Box::new(e)));
}
}
}
let listing = listing.into_iter().sorted().rev().collect_vec();
🤖 Prompt for AI Agents
In src/query/listing_table_builder.rs around lines 90 to 111, the code currently
pushes only child names returned from storage.list_dirs_relative(&prefix) into
listing, which loses the parent prefix and results in invalid URIs; change the
push to join each child path with its original prefix (preserving relative path
semantics) before adding to listing (e.g., use the prefix to join/concatenate
the child path or call a RelativePathBuf join method), keep error handling
as-is, then sort/reverse as before.

Comment on lines +150 to 154
let compressed_size = path
.metadata()
.map(|m| m.len())
.map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?;
STORAGE_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t fail the upload flow if local metadata read fails.

If metadata() errors here, the whole upload path fails after the object has already been uploaded, leaving a pushed object without a manifest and the staged file retained. Metrics should be best-effort; log and continue with 0 or move metrics after manifest creation.

-    let compressed_size = path
-        .metadata()
-        .map(|m| m.len())
-        .map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?;
+    let compressed_size = match path.metadata().map(|m| m.len()) {
+        Ok(len) => len,
+        Err(e) => {
+            warn!("metadata() failed for {filename}: {e}; skipping size-based metrics");
+            0
+        }
+    };

Optionally, move metrics after manifest creation and wrap in a non-fatal block.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let compressed_size = path
.metadata()
.map(|m| m.len())
.map_err(|e| ObjectStorageError::Custom(format!("metadata failed for {filename}: {e}")))?;
STORAGE_SIZE
let compressed_size = match path.metadata().map(|m| m.len()) {
Ok(len) => len,
Err(e) => {
warn!("metadata() failed for {filename}: {e}; skipping size-based metrics");
0
}
};
STORAGE_SIZE
🤖 Prompt for AI Agents
In src/storage/object_storage.rs around lines 150-154, the current call to
path.metadata().map(...).map_err(...) aborts the upload if metadata() fails;
change this so metadata failures are non-fatal: either attempt to read metadata
but on Err log the error and set compressed_size to 0 (or None) and continue, or
move the metrics/metadata computation to after manifest creation and run it in a
non-fatal block that logs errors without returning Err. Ensure you do not
propagate ObjectStorageError for metadata failures so the upload/manifest flow
completes and staged file cleanup still occurs.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/query/stream_schema_provider.rs (1)

587-596: Avoid unwrap on ObjectStoreUrl::parse to prevent rare panics

If storage.store_url() is ever malformed (misconfiguration), unwrap will panic the query path. Map to a DataFusion error instead.

Apply this diff:

-            ObjectStoreUrl::parse(storage.store_url()).unwrap(),
+            ObjectStoreUrl::parse(storage.store_url())
+                .map_err(|e| DataFusionError::External(Box::new(e)))?,
src/storage/metrics_layer.rs (1)

295-303: Fix label cardinality bug in StreamMetricWrapper (2 labels supplied to a 3-label histogram).

STORAGE_REQUEST_RESPONSE_TIME expects 3 labels: provider, operation, status. The StreamMetricWrapper supplies only ["LIST","200"], causing a runtime label-cardinality panic and losing metrics for streaming list operations.

Refactor wrapper to carry provider and emit 3 labels:

@@
-    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
         let time = time::Instant::now();
         let inner = self.inner.list(prefix);
-        let res = StreamMetricWrapper {
-            time,
-            labels: ["LIST", "200"],
-            inner,
-        };
+        let res = StreamMetricWrapper {
+            time,
+            provider: &self.provider,
+            op: "LIST",
+            status: "200",
+            inner,
+        };
         Box::pin(res)
     }
@@
-    fn list_with_offset(
+    fn list_with_offset(
         &self,
         prefix: Option<&Path>,
         offset: &Path,
     ) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
         let time = time::Instant::now();
         let inner = self.inner.list_with_offset(prefix, offset);
-        let res = StreamMetricWrapper {
-            time,
-            labels: ["LIST_OFFSET", "200"],
-            inner,
-        };
+        let res = StreamMetricWrapper {
+            time,
+            provider: &self.provider,
+            op: "LIST_OFFSET",
+            status: "200",
+            inner,
+        };
 
         Box::pin(res)
     }
@@
-struct StreamMetricWrapper<'a, const N: usize, T> {
-    time: time::Instant,
-    labels: [&'static str; N],
-    inner: BoxStream<'a, T>,
-}
+struct StreamMetricWrapper<'a, T> {
+    time: time::Instant,
+    provider: &'a str,
+    op: &'static str,
+    status: &'static str,
+    inner: BoxStream<'a, T>,
+}
@@
-impl<T, const N: usize> Stream for StreamMetricWrapper<'_, N, T> {
+impl<T> Stream for StreamMetricWrapper<'_, T> {
     type Item = T;
@@
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&self.labels)
-                    .observe(self.time.elapsed().as_secs_f64());
+                STORAGE_REQUEST_RESPONSE_TIME
+                    .with_label_values(&[self.provider, self.op, self.status])
+                    .observe(self.time.elapsed().as_secs_f64());
                 t

Also applies to: 305-320, 402-426

src/storage/gcs.rs (1)

760-775: Remove duplicate GET timing and counting from get_objects loop.

You already time GETs and increment GET files-scanned inside _get_object(). The loop here re-records GET using the list_start timer and increments GET counts again, inflating numbers and mis-measuring latency.

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["gcs", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["gcs", "GET"])
-                .inc();
-            STORAGE_FILES_SCANNED_DATE
-                .with_label_values(&["gcs", "GET", &Utc::now().date_naive().to_string()])
-                .inc();
♻️ Duplicate comments (7)
src/storage/gcs.rs (3)

559-577: Consistency: count small-file PUTs in multipart path.

In the <5MB branch you perform a single PUT but don’t bump files-scanned. Align with other PUT paths.

             let put_start = Instant::now();
             let result = self.client.put(location, data.into()).await;
             let put_elapsed = put_start.elapsed().as_secs_f64();
+            STORAGE_FILES_SCANNED
+                .with_label_values(&["gcs", "PUT"])
+                .inc();
+            STORAGE_FILES_SCANNED_DATE
+                .with_label_values(&["gcs", "PUT", &Utc::now().date_naive().to_string()])
+                .inc();

185-207: Avoid panic and measure full GET latency (don’t unwrap body, record after bytes()).

resp.bytes().await.unwrap() can panic, and you record latency before the body read. Record GET timing after the body is fully read; map body-read errors to status codes.

     async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
-        let time = std::time::Instant::now();
-        let resp = self.client.get(&to_object_store_path(path)).await;
-        let elapsed = time.elapsed().as_secs_f64();
+        let time = std::time::Instant::now();
+        let resp = self.client.get(&to_object_store_path(path)).await;
         STORAGE_FILES_SCANNED
             .with_label_values(&["gcs", "GET"])
             .inc();
         STORAGE_FILES_SCANNED_DATE
             .with_label_values(&["gcs", "GET", &Utc::now().date_naive().to_string()])
             .inc();
         match resp {
-            Ok(resp) => {
-                let body: Bytes = resp.bytes().await.unwrap();
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["gcs", "GET", "200"])
-                    .observe(elapsed);
-                Ok(body)
-            }
+            Ok(resp) => match resp.bytes().await {
+                Ok(body) => {
+                    let elapsed = time.elapsed().as_secs_f64();
+                    STORAGE_REQUEST_RESPONSE_TIME
+                        .with_label_values(&["gcs", "GET", "200"])
+                        .observe(elapsed);
+                    Ok(body)
+                }
+                Err(err) => {
+                    let elapsed = time.elapsed().as_secs_f64();
+                    let status_code = error_to_status_code(&err);
+                    STORAGE_REQUEST_RESPONSE_TIME
+                        .with_label_values(&["gcs", "GET", status_code])
+                        .observe(elapsed);
+                    Err(err.into())
+                }
+            },
             Err(err) => {
-                let status_code = error_to_status_code(&err);
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["gcs", "GET", status_code])
-                    .observe(elapsed);
+                let elapsed = time.elapsed().as_secs_f64();
+                let status_code = error_to_status_code(&err);
+                STORAGE_REQUEST_RESPONSE_TIME
+                    .with_label_values(&["gcs", "GET", status_code])
+                    .observe(elapsed);
                 Err(err.into())
             }
         }
     }

256-276: Count deletes on success only; move increment after successful delete.

files_deleted is incremented before attempting the delete; this overcounts when deletes fail. Increment only on Ok(_).

-                    Ok(obj) => {
-                        files_deleted.fetch_add(1, Ordering::Relaxed);
+                    Ok(obj) => {
                         let delete_start = Instant::now();
                         let delete_resp = self.client.delete(&obj.location).await;
                         let delete_elapsed = delete_start.elapsed().as_secs_f64();
                         match delete_resp {
                             Ok(_) => {
+                                files_deleted.fetch_add(1, Ordering::Relaxed);
                                 STORAGE_REQUEST_RESPONSE_TIME
                                     .with_label_values(&["gcs", "DELETE", "200"])
                                     .observe(delete_elapsed);
src/storage/azure_blob.rs (3)

594-617: Consistency: count small-file PUTs in multipart path.

Add STORAGE_FILES_SCANNED increments for the single PUT path to match other PUT flows.

             let put_start = Instant::now();
             let result = self.client.put(location, data.into()).await;
             let put_elapsed = put_start.elapsed().as_secs_f64();
+            STORAGE_FILES_SCANNED
+                .with_label_values(&["azure_blob", "PUT"])
+                .inc();
+            STORAGE_FILES_SCANNED_DATE
+                .with_label_values(&["azure_blob", "PUT", &Utc::now().date_naive().to_string()])
+                .inc();

220-244: Avoid panic and measure full GET latency (don’t unwrap body, record after bytes()).

Same issue as GCS: unwrap can panic; also you measure elapsed before reading the body. Record GET timing after bytes() and handle errors.

     async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
-        let time = std::time::Instant::now();
-        let resp = self.client.get(&to_object_store_path(path)).await;
-        let elapsed = time.elapsed().as_secs_f64();
+        let time = std::time::Instant::now();
+        let resp = self.client.get(&to_object_store_path(path)).await;
         STORAGE_FILES_SCANNED
             .with_label_values(&["azure_blob", "GET"])
             .inc();
         STORAGE_FILES_SCANNED_DATE
             .with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()])
             .inc();
-        match resp {
-            Ok(resp) => {
-                let body: Bytes = resp.bytes().await.unwrap();
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["azure_blob", "GET", "200"])
-                    .observe(elapsed);
-                Ok(body)
-            }
+        match resp {
+            Ok(resp) => match resp.bytes().await {
+                Ok(body) => {
+                    let elapsed = time.elapsed().as_secs_f64();
+                    STORAGE_REQUEST_RESPONSE_TIME
+                        .with_label_values(&["azure_blob", "GET", "200"])
+                        .observe(elapsed);
+                    Ok(body)
+                }
+                Err(err) => {
+                    let elapsed = time.elapsed().as_secs_f64();
+                    let status_code = error_to_status_code(&err);
+                    STORAGE_REQUEST_RESPONSE_TIME
+                        .with_label_values(&["azure_blob", "GET", status_code])
+                        .observe(elapsed);
+                    Err(err.into())
+                }
+            },
             Err(err) => {
-                let status_code = error_to_status_code(&err);
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["azure_blob", "GET", status_code])
-                    .observe(elapsed);
+                let elapsed = time.elapsed().as_secs_f64();
+                let status_code = error_to_status_code(&err);
+                STORAGE_REQUEST_RESPONSE_TIME
+                    .with_label_values(&["azure_blob", "GET", status_code])
+                    .observe(elapsed);
                 Err(err.into())
             }
         }
     }

292-312: Count deletes on success only; move increment after successful delete.

files_deleted is incremented before the delete attempt. Increment only when delete returns Ok(_).

-                    Ok(obj) => {
-                        files_deleted.fetch_add(1, Ordering::Relaxed);
+                    Ok(obj) => {
                         let delete_start = Instant::now();
                         let delete_resp = self.client.delete(&obj.location).await;
                         let delete_elapsed = delete_start.elapsed().as_secs_f64();
                         match delete_resp {
                             Ok(_) => {
+                                files_deleted.fetch_add(1, Ordering::Relaxed);
                                 STORAGE_REQUEST_RESPONSE_TIME
                                     .with_label_values(&["azure_blob", "DELETE", "200"])
                                     .observe(delete_elapsed);
src/storage/s3.rs (1)

827-835: HEAD “files scanned” double-count from previous review is now resolved

Increment occurs exactly once per HEAD attempt and not again in the Ok branch. This aligns with the guidance in the prior review comment.

Also applies to: 865-874

🧹 Nitpick comments (17)
src/query/stream_schema_provider.rs (7)

327-401: Avoid over-partitioning when the file list is small

Using all CPUs for partition fan-out can create a lot of empty file groups when there are only a few files, with minor scheduling overhead. Cap partitions by the number of files while keeping a floor of 1.

Apply this diff:

-        let target_partition: usize = num_cpus::get();
+        let cpu = num_cpus::get();
+        // Avoid many empty partitions when the manifest file list is small
+        let target_partition: usize = std::cmp::max(1, std::cmp::min(cpu, manifest_files.len()));

474-516: Prefer preserving the original storage error instead of stringifying it

Today get_object_store_format errors are mapped to DataFusionError::Plan with err.to_string(). Consider carrying the source error to keep context (provider, status, etc.) intact.

Apply this diff:

-            .await
-            .map_err(|err| DataFusionError::Plan(err.to_string()))?;
+            .await
+            .map_err(|err| DataFusionError::External(Box::new(err)))?;

520-536: Nit: fix variable name typo for readability (listing_time_fiters → listing_time_filters)

Small readability tweak; avoids future confusion.

Apply this diff:

-        if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
-            let listing_time_fiters =
-                return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);
+        if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
+            let listing_time_filters =
+                return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);
 
-            if let Some(listing_time_filter) = listing_time_fiters {
+            if let Some(listing_time_filter) = listing_time_filters {

572-583: Files-scanned counters: confirm intended semantics and avoid potential double-counting

Incrementing STORAGE_FILES_SCANNED(_DATE) by the number of parquet files in the plan captures DataFusion-driven GETs that bypass the storage abstraction. That’s useful, but note:

  • Planning may overcount if pruning/early-limit prevents opening some files.
  • If we later wrap DataFusion’s object_store client for metrics, we may double count.

If the intent is “planned parquet files to scan,” consider documenting that in the metric help text and/or using a distinct method label (e.g., SCAN) to distinguish from storage-layer GET calls. Otherwise, this looks fine.

Would you like me to open a follow-up to align metric help/labels so dashboards don’t mix “planned scans” with “actual GETs”?


87-99: Optional: Auto-create stream/schema from storage in Query/Prism mode

Based on the retrieved learnings from PR #1185, when a table is not present in memory, Query/Prism modes should attempt to create the stream and schema from storage before returning None. This avoids false negatives when the process starts without preloaded stream metadata.

Using the pattern from the learnings:

// Pseudocode inside GlobalSchemaProvider::table(...)
if !self.table_exist(name)
   && (PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism)
{
    if PARSEABLE.create_stream_and_schema_from_storage(name).await.unwrap_or(false) {
        // proceed to return the table as in the happy path
    } else {
        return Ok(None);
    }
}

I can send a concrete diff if you confirm create_stream_and_schema_from_storage is available here.

Note: This suggestion leverages the “retrieved learnings” provided for this repository.


495-513: Query/Prism: merging snapshots from stream.json is fine; consider error/metrics posture

You’re permissive on errors (ignore failures to read/parse stream.json) which favors availability. If you want observability, at minimum log debug/warn with the failing key and error cause. Also, if the storage layer doesn’t already emit metrics for get_objects here, consider relying on it rather than adding counters locally, to avoid duplication.

I can add structured logs here with provider/path labels if you want to keep the “best effort” behavior but make failures visible in traces.


586-596: Minor: pass time_partition by reference consistently

A few call sites clone Option; passing as_ref()/cloned() is already mixed above. Not a blocker, just a reminder to keep it consistent for readability.

src/metrics/mod.rs (1)

163-173: Nit: clarify help text to reflect “global totals across all streams”.

Per the PR discussion and retrieved learnings, TOTAL_*_DATE metrics intentionally aggregate across all streams using labels ["format","date"]. To avoid future confusion with per-stream metrics, tweak the help text to explicitly say “across all streams.”

         Opts::new(
             "total_events_ingested_date",
-            "total events ingested on a particular date",
+            "Total events ingested across all streams on a particular date",
         )
...
         Opts::new(
             "total_events_ingested_size_date",
-            "Total events ingested size in bytes on a particular date",
+            "Total events ingested size in bytes across all streams on a particular date",
         )
...
         Opts::new(
             "total_events_storage_size_date",
-            "Total events storage size in bytes on a particular date",
+            "Total events storage size in bytes across all streams on a particular date",
         )

Also applies to: 175-185, 187-197

src/storage/metrics_layer.rs (1)

187-201: Optional: consider counting “files scanned” in the layer for DataFusion code paths.

Operations invoked via DataFusion use this MetricLayer, not the provider modules. If you want files-scanned to be complete “for all object store APIs,” add STORAGE_FILES_SCANNED/STORAGE_FILES_SCANNED_DATE increments here as well (mirroring provider modules). If intentional to keep counts only in provider code, feel free to ignore.

Also applies to: 203-217, 219-233, 235-253, 255-269, 271-285, 321-335, 337-351, 353-367, 369-399

src/storage/azure_blob.rs (1)

708-737: HEAD “files scanned” should count attempts, not only successes.

Currently HEAD increments STORAGE_FILES_SCANNED only on Ok. GCS HEAD increments regardless of outcome. For consistency and to match “files-scanned count for all object store APIs,” increment before match.

     async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
         let head_start = Instant::now();
         let result = self.client.head(&to_object_store_path(path)).await;
         let head_elapsed = head_start.elapsed().as_secs_f64();
-        match &result {
+        // Count the attempted HEAD
+        STORAGE_FILES_SCANNED
+            .with_label_values(&["azure_blob", "HEAD"])
+            .inc();
+        STORAGE_FILES_SCANNED_DATE
+            .with_label_values(&["azure_blob", "HEAD", &Utc::now().date_naive().to_string()])
+            .inc();
+        match &result {
             Ok(_) => {
                 STORAGE_REQUEST_RESPONSE_TIME
                     .with_label_values(&["azure_blob", "HEAD", "200"])
                     .observe(head_elapsed);
-                // Record single file accessed
-                STORAGE_FILES_SCANNED
-                    .with_label_values(&["azure_blob", "HEAD"])
-                    .inc();
-                STORAGE_FILES_SCANNED_DATE
-                    .with_label_values(&[
-                        "azure_blob",
-                        "HEAD",
-                        &Utc::now().date_naive().to_string(),
-                    ])
-                    .inc();
             }
src/storage/localfs.rs (2)

404-426: Optional: count PUT attempts regardless of outcome for consistency.

Other providers increment STORAGE_FILES_SCANNED for PUT before the result. Here it’s only on Ok. Consider moving the increment above the match.

         let res = fs::write(path, resource).await;
         let put_elapsed = put_start.elapsed().as_secs_f64();
+        // Count attempted PUT
+        STORAGE_FILES_SCANNED
+            .with_label_values(&["localfs", "PUT"])
+            .inc();
+        STORAGE_FILES_SCANNED_DATE
+            .with_label_values(&["localfs", "PUT", &Utc::now().date_naive().to_string()])
+            .inc();
 
         match &res {
             Ok(_) => {
                 STORAGE_REQUEST_RESPONSE_TIME
                     .with_label_values(&["localfs", "PUT", "200"])
                     .observe(put_elapsed);
-                // Record single file written successfully
-                STORAGE_FILES_SCANNED
-                    .with_label_values(&["localfs", "PUT"])
-                    .inc();
-                STORAGE_FILES_SCANNED_DATE
-                    .with_label_values(&["localfs", "PUT", &Utc::now().date_naive().to_string()])
-                    .inc();
             }

838-867: Avoid blocking the async runtime when copying large files.

fs_extra::file::copy is blocking and can stall the Tokio executor under load. Wrap it in spawn_blocking to avoid starving other tasks.

-        let result = fs_extra::file::copy(path, to_path, &op);
+        let to_path_clone = to_path.clone();
+        let op_clone = op.clone();
+        let result = tokio::task::spawn_blocking(move || {
+            fs_extra::file::copy(path, to_path_clone, &op_clone)
+        })
+        .await
+        .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;
src/storage/s3.rs (3)

1076-1096: Count DELETE attempts, not only successes, for consistent “files scanned” semantics

Elsewhere (GET/HEAD), you increment STORAGE_FILES_SCANNED before knowing the outcome, i.e., per attempt. Here it increments only on success. Move the counters before the match so DELETE aligns with the “attempts” model from the PR objectives.

Apply this diff:

-        match &result {
-            Ok(_) => {
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["s3", "DELETE", "200"])
-                    .observe(delete_elapsed);
-                // Record single file deleted
-                STORAGE_FILES_SCANNED
-                    .with_label_values(&["s3", "DELETE"])
-                    .inc();
-                STORAGE_FILES_SCANNED_DATE
-                    .with_label_values(&["s3", "DELETE", &Utc::now().date_naive().to_string()])
-                    .inc();
-            }
+        // Count the DELETE attempt regardless of outcome
+        STORAGE_FILES_SCANNED
+            .with_label_values(&["s3", "DELETE"])
+            .inc();
+        STORAGE_FILES_SCANNED_DATE
+            .with_label_values(&["s3", "DELETE", &Utc::now().date_naive().to_string()])
+            .inc();
+        match &result {
+            Ok(_) => {
+                STORAGE_REQUEST_RESPONSE_TIME
+                    .with_label_values(&["s3", "DELETE", "200"])
+                    .observe(delete_elapsed);
+            }

694-713: Multipart uploads: confirm whether files-scanned should be recorded for PUT_MULTIPART and COMPLETE

Small-file fallback increments PUT counters, but true multipart flows record only latency (no files-scanned). If the intent is “files-scanned count for all object store APIs,” add a single increment for PUT_MULTIPART initiation and one for PUT_MULTIPART_COMPLETE (attempts, not per-part).

Suggested adjustments:

         let async_writer = self.client.put_multipart(location).await;
         let multipart_elapsed = multipart_start.elapsed().as_secs_f64();
+        // Count the multipart initiation attempt
+        STORAGE_FILES_SCANNED
+            .with_label_values(&["s3", "PUT_MULTIPART"])
+            .inc();
+        STORAGE_FILES_SCANNED_DATE
+            .with_label_values(&["s3", "PUT_MULTIPART", &Utc::now().date_naive().to_string()])
+            .inc();

@@
-            if let Err(err) = complete_result {
+            // Count the multipart completion attempt
+            STORAGE_FILES_SCANNED
+                .with_label_values(&["s3", "PUT_MULTIPART_COMPLETE"])
+                .inc();
+            STORAGE_FILES_SCANNED_DATE
+                .with_label_values(&["s3", "PUT_MULTIPART_COMPLETE", &Utc::now().date_naive().to_string()])
+                .inc();
+            if let Err(err) = complete_result {
                 let status_code = error_to_status_code(&err);
                 STORAGE_REQUEST_RESPONSE_TIME
                     .with_label_values(&["s3", "PUT_MULTIPART_COMPLETE", status_code])
                     .observe(complete_elapsed);

Note: I’m intentionally not incrementing for each PUT_MULTIPART_PART to avoid inflating counts by part numbers. If you do want that, add a single increment per part beside each put_part.

Also applies to: 797-813


347-350: Unify Instant usage; prefer the imported Instant over fully qualified std::time::Instant

Minor consistency/readability nit: the file already imports time::Instant. Use it everywhere.

Apply this diff:

-        let time = std::time::Instant::now();
+        let time = Instant::now();
@@
-        let time = std::time::Instant::now();
+        let time = Instant::now();

Also applies to: 379-382

src/metrics/storage.rs (2)

25-32: Tune histogram buckets for storage latencies

Default buckets can be skewed for S3-like latencies. Consider custom buckets (e.g., sub-10ms to tens of seconds) to capture both cache hits and cold-path operations better.

Example:

 pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
-    HistogramVec::new(
-        HistogramOpts::new("storage_request_response_time", "Storage Request Latency")
-            .namespace(METRICS_NAMESPACE),
-        &["provider", "method", "status"],
-    )
+    {
+        let opts = HistogramOpts::new("storage_request_response_time", "Storage Request Latency")
+            .namespace(METRICS_NAMESPACE)
+            .buckets(vec![
+                0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5,
+                1.0, 2.0, 5.0, 10.0, 20.0, 30.0,
+            ]);
+        HistogramVec::new(opts, &["provider", "method", "status"])
+    }
     .expect("metric can be created")
 });

63-83: DRY: centralize metric registration to remove duplicated code across providers

All four impls register the same three collectors. Factor this into a helper to reduce duplication and keep future changes in one place.

One way:

 pub trait StorageMetrics {
     fn register_metrics(&self, handler: &PrometheusMetrics);
 }
 
+fn register_storage_metrics(handler: &PrometheusMetrics) {
+    handler
+        .registry
+        .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone()))
+        .expect("metric can be registered");
+    handler
+        .registry
+        .register(Box::new(STORAGE_FILES_SCANNED.clone()))
+        .expect("metric can be registered");
+    handler
+        .registry
+        .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone()))
+        .expect("metric can be registered");
+}
@@
 impl StorageMetrics for FSConfig {
     fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
-        handler
-            .registry
-            .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone()))
-            .expect("metric can be registered");
+        register_storage_metrics(handler);
     }
 }
@@
 impl StorageMetrics for S3Config {
     fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
-        handler
-            .registry
-            .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone()))
-            .expect("metric can be registered");
+        register_storage_metrics(handler);
     }
 }
@@
 impl StorageMetrics for AzureBlobConfig {
     fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
-        handler
-            .registry
-            .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone()))
-            .expect("metric can be registered");
+        register_storage_metrics(handler);
     }
 }
@@
 impl StorageMetrics for GcsConfig {
     fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
-        handler
-            .registry
-            .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED.clone()))
-            .expect("metric can be registered");
-        handler
-            .registry
-            .register(Box::new(STORAGE_FILES_SCANNED_DATE.clone()))
-            .expect("metric can be registered");
+        register_storage_metrics(handler);
     }
 }

Note: Per the maintainer’s note and learnings, only one provider is initialized per process, so there’s no multi-registration panic risk; this change is purely to reduce duplication.

Also applies to: 86-107, 109-130, 132-153

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6850fb2 and c88615f.

📒 Files selected for processing (14)
  • src/catalog/mod.rs (1 hunks)
  • src/metadata.rs (3 hunks)
  • src/metrics/mod.rs (6 hunks)
  • src/metrics/storage.rs (1 hunks)
  • src/query/listing_table_builder.rs (2 hunks)
  • src/query/mod.rs (1 hunks)
  • src/query/stream_schema_provider.rs (12 hunks)
  • src/stats.rs (3 hunks)
  • src/storage/azure_blob.rs (24 hunks)
  • src/storage/gcs.rs (23 hunks)
  • src/storage/localfs.rs (15 hunks)
  • src/storage/metrics_layer.rs (8 hunks)
  • src/storage/object_storage.rs (2 hunks)
  • src/storage/s3.rs (24 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/query/mod.rs
  • src/catalog/mod.rs
  • src/storage/object_storage.rs
  • src/metadata.rs
  • src/stats.rs
  • src/query/listing_table_builder.rs
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-08-25T01:31:41.768Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.768Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/metrics_layer.rs
  • src/metrics/mod.rs
  • src/storage/localfs.rs
  • src/storage/s3.rs
  • src/metrics/storage.rs
  • src/storage/azure_blob.rs
📚 Learning: 2025-08-25T01:32:25.937Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.937Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.

Applied to files:

  • src/storage/gcs.rs
  • src/metrics/mod.rs
  • src/storage/localfs.rs
  • src/metrics/storage.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/s3.rs
  • src/storage/azure_blob.rs
📚 Learning: 2025-08-21T14:41:55.462Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:876-916
Timestamp: 2025-08-21T14:41:55.462Z
Learning: In Parseable's object storage system (src/storage/object_storage.rs), date directories (date=YYYY-MM-DD) are only created when there's actual data to store, which means they will always contain corresponding hour and minute subdirectories. There can be no case where a date directory exists without hour or minute subdirectories.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/localfs.rs
  • src/storage/s3.rs
  • src/storage/azure_blob.rs
📚 Learning: 2025-08-21T11:47:01.279Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.279Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.

Applied to files:

  • src/storage/gcs.rs
  • src/storage/s3.rs
  • src/storage/azure_blob.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/query/stream_schema_provider.rs
📚 Learning: 2025-08-25T01:33:36.398Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metrics/storage.rs:51-68
Timestamp: 2025-08-25T01:33:36.398Z
Learning: In the Parseable storage system, only one storage provider (localfs, s3, azureblob, or gcs) is meant to be initialized per process, which ensures that global metrics like STORAGE_REQUEST_RESPONSE_TIME and STORAGE_FILES_SCANNED are registered exactly once without risk of duplicate registration panics.

Applied to files:

  • src/metrics/storage.rs
🧬 Code graph analysis (6)
src/storage/gcs.rs (2)
src/storage/metrics_layer.rs (3)
  • error_to_status_code (40-66)
  • new (75-80)
  • head (255-269)
src/storage/object_storage.rs (3)
  • parseable_json_path (1281-1283)
  • new (82-91)
  • head (209-209)
src/storage/metrics_layer.rs (3)
src/storage/azure_blob.rs (1)
  • head (708-738)
src/storage/gcs.rs (1)
  • head (695-720)
src/storage/s3.rs (3)
  • head (865-890)
  • from (1434-1442)
  • from (1446-1448)
src/query/stream_schema_provider.rs (3)
src/parseable/mod.rs (4)
  • storage (244-246)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/enterprise/utils.rs (1)
  • collect_manifest_files (159-182)
src/storage/s3.rs (2)
  • from (1434-1442)
  • from (1446-1448)
src/storage/localfs.rs (2)
src/storage/metrics_layer.rs (1)
  • copy (337-351)
src/storage/azure_blob.rs (3)
  • get_ingestor_meta_file_paths (812-850)
  • delete_object (919-950)
  • check (952-981)
src/storage/s3.rs (3)
src/storage/metrics_layer.rs (3)
  • error_to_status_code (40-66)
  • new (75-80)
  • head (255-269)
src/storage/localfs.rs (6)
  • new (104-106)
  • name (76-78)
  • from (985-987)
  • head (136-150)
  • get_ingestor_meta_file_paths (190-242)
  • check (496-520)
src/storage/azure_blob.rs (12)
  • resp (347-353)
  • resp (477-483)
  • resp (1240-1245)
  • resp (1284-1289)
  • _delete_prefix (278-333)
  • _list_streams (335-400)
  • name (169-171)
  • _list_dates (402-443)
  • _upload_file (529-557)
  • head (708-738)
  • get_ingestor_meta_file_paths (812-850)
  • check (952-981)
src/storage/azure_blob.rs (3)
src/storage/metrics_layer.rs (3)
  • error_to_status_code (40-66)
  • new (75-80)
  • head (255-269)
src/storage/object_storage.rs (4)
  • parseable_json_path (1281-1283)
  • new (82-91)
  • name (200-200)
  • head (209-209)
src/storage/gcs.rs (9)
  • resp (312-318)
  • resp (442-448)
  • resp (1206-1211)
  • resp (1246-1251)
  • _delete_prefix (243-298)
  • _list_streams (300-365)
  • name (130-132)
  • _upload_file (494-522)
  • head (695-720)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (6)
src/query/stream_schema_provider.rs (3)

19-67: Imports and storage/metrics wiring look correct

  • Moving to PathBuf/RelativePathBuf and the ObjectStorage abstraction is consistent with the PR objectives.
  • TryFutureExt is used later (and_then chain), so no unused import.
  • Metrics imports (STORAGE_FILES_SCANNED, STORAGE_FILES_SCANNED_DATE) align with the new counters.

284-301: populate_via_listing now accepts ObjectStorage — good abstraction boundary

Switching ListingTableBuilder::populate_via_listing to take the storage layer rather than a raw object_store keeps the query layer decoupled and lets the storage layer own metrics and provider-specific logic. The TryFutureExt.and_then usage keeps the flow clean.


404-422: Good: storage plumbed through collect_from_snapshot with proper error mapping

Propagating ObjectStorageError via DataFusionError::External is the right choice here; this preserves the original error chain for diagnostics.

src/metrics/mod.rs (2)

268-276: Registration of TOTAL_ metrics looks correct.*

All three TOTAL_*_DATE metrics are registered in the custom registry; no gaps spotted.


127-137: It looks like there are no remaining .set() calls on those three metrics—so all callsites have been updated to use inc()/inc_by().

src/storage/metrics_layer.rs (1)

37-66: Status mapping helper and provider label plumbing look solid.

The error_to_status_code coverage is pragmatic and the provider field in MetricLayer enables per-provider partitioning without code duplication. No issues here.

Comment on lines 853 to 876
pub async fn collect_manifest_files(
storage: Arc<dyn ObjectStore>,
storage: Arc<dyn ObjectStorage>,
manifest_urls: Vec<String>,
) -> Result<Vec<Manifest>, object_store::Error> {
let tasks = manifest_urls.into_iter().map(|path| {
let path = Path::parse(path).unwrap();
) -> Result<Vec<Manifest>, ObjectStorageError> {
let mut tasks = Vec::new();
manifest_urls.into_iter().for_each(|path| {
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
let storage = Arc::clone(&storage);
async move { storage.get(&path).await }
tasks.push(tokio::task::spawn(async move {
storage.get_object(&path).await
}));
});

let resp = FuturesOrdered::from_iter(tasks)
.and_then(|res| res.bytes())
.collect::<Vec<object_store::Result<Bytes>>>()
.await;
let mut op = Vec::new();
for task in tasks {
let file = task.await??;
op.push(file);
}

Ok(resp
Ok(op
.into_iter()
.flat_map(|res| res.ok())
.map(|bytes| serde_json::from_slice(&bytes).unwrap())
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
.collect())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Harden manifest collection: eliminate panics, bound concurrency, keep error context

Current issues:

  • Path conversion and JSON deserialization use expect(), which can panic on corrupt manifests or bad paths.
  • Spawning an unbounded number of tasks scales poorly with large manifest lists.
  • Error context (which path failed) is lost.

Refactor to:

  • Bound concurrency with buffer_unordered.
  • Map errors into ObjectStorageError without panicking.
  • Keep the error chain so provider/status can be surfaced by callers.

Apply this diff:

-pub async fn collect_manifest_files(
-    storage: Arc<dyn ObjectStorage>,
-    manifest_urls: Vec<String>,
-) -> Result<Vec<Manifest>, ObjectStorageError> {
-    let mut tasks = Vec::new();
-    manifest_urls.into_iter().for_each(|path| {
-        let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
-        let storage = Arc::clone(&storage);
-        tasks.push(tokio::task::spawn(async move {
-            storage.get_object(&path).await
-        }));
-    });
-
-    let mut op = Vec::new();
-    for task in tasks {
-        let file = task.await??;
-        op.push(file);
-    }
-
-    Ok(op
-        .into_iter()
-        .map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
-        .collect())
-}
+pub async fn collect_manifest_files(
+    storage: Arc<dyn ObjectStorage>,
+    manifest_urls: Vec<String>,
+) -> Result<Vec<Manifest>, ObjectStorageError> {
+    // Bound concurrency to prevent unbounded task fan-out for large manifest sets.
+    let concurrency = std::cmp::min(32, std::cmp::max(1, num_cpus::get() * 2));
+
+    let results = futures_util::stream::iter(manifest_urls.into_iter())
+        .map(|path| {
+            let storage = Arc::clone(&storage);
+            async move {
+                let path = RelativePathBuf::from_path(PathBuf::from(path))
+                    .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;
+                storage.get_object(&path).await
+            }
+        })
+        .buffer_unordered(concurrency)
+        .collect::<Vec<_>>()
+        .await;
+
+    let mut manifests = Vec::with_capacity(results.len());
+    for res in results {
+        let bytes = res?;
+        let manifest: Manifest = serde_json::from_slice(&bytes)
+            .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;
+        manifests.push(manifest);
+    }
+    Ok(manifests)
+}

Additionally, add these imports near the other futures imports:

use futures_util::stream::{self, StreamExt};

If you’d like, I can wire in richer error variants (e.g., InvalidPath, InvalidManifest) on ObjectStorageError and convert serde/path errors accordingly, so observability improves without relying on “UnhandledError”.

🤖 Prompt for AI Agents
In src/query/stream_schema_provider.rs around lines 853 to 876, replace the
current unbounded spawn+expect-based implementation: convert each manifest URL
to a Path once (returning a mapped ObjectStorageError instead of panicking on
invalid path), build a futures stream from the path list, use
stream::iter(...).map(|path| async move {
storage.get_object(&path).await.map(|b| (path, b)) }).buffer_unordered(N) to
bound concurrency, collect results while mapping any errors into
ObjectStorageError that include the failing path (preserve the original error as
source), and parse JSON with serde_json::from_slice mapping deserialization
failures into ObjectStorageError with path context instead of expect; also add
the import use futures_util::stream::{self, StreamExt}; to enable
buffer_unordered. Ensure all error mappings carry path info so callers can
surface provider/status.

Comment on lines +784 to +792
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["azure_blob", "GET", "200"])
.observe(list_start.elapsed().as_secs_f64());
STORAGE_FILES_SCANNED
.with_label_values(&["azure_blob", "GET"])
.inc();
STORAGE_FILES_SCANNED_DATE
.with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()])
.inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove duplicate GET timing and counting from get_objects loop.

Same duplication as in GCS: _get_object() already records GET timing and GET files-scanned. The loop uses list_start for GET latency and increments again.

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["azure_blob", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["azure_blob", "GET"])
-                .inc();
-            STORAGE_FILES_SCANNED_DATE
-                .with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()])
-                .inc();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["azure_blob", "GET", "200"])
.observe(list_start.elapsed().as_secs_f64());
STORAGE_FILES_SCANNED
.with_label_values(&["azure_blob", "GET"])
.inc();
STORAGE_FILES_SCANNED_DATE
.with_label_values(&["azure_blob", "GET", &Utc::now().date_naive().to_string()])
.inc();
🤖 Prompt for AI Agents
In src/storage/azure_blob.rs around lines 784 to 792, the loop is redundantly
recording GET metrics that are already recorded by get_object(); remove the
three metric updates inside this loop (the STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["azure_blob","GET","200"]).observe(...),
STORAGE_FILES_SCANNED .with_label_values(&["azure_blob","GET"]).inc(), and
STORAGE_FILES_SCANNED_DATE
.with_label_values(&["azure_blob","GET",&Utc::now().date_naive().to_string()]).inc())
so that GET timing and files-scanned are only recorded once by get_object();
leave any non-GET or listing metrics intact.

Comment on lines 356 to 371
match resp {
Ok(resp) => {
let time = instant.elapsed().as_secs_f64();
REQUEST_RESPONSE_TIME
.with_label_values(&["GET", "200"])
.observe(time);
let body = resp.bytes().await.unwrap();
let body: Bytes = resp.bytes().await.unwrap();
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", "200"])
.observe(elapsed);
Ok(body)
}
Err(err) => {
let time = instant.elapsed().as_secs_f64();
REQUEST_RESPONSE_TIME
.with_label_values(&["GET", "400"])
.observe(time);
let status_code = error_to_status_code(&err);
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", status_code])
.observe(elapsed);
Err(err.into())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid panic on GET body read; record error status instead of unwrap()

resp.bytes().await.unwrap() can panic and will also mislabel the request as 200 if the body read fails after a successful GET. Handle the body read error, map it to a status via error_to_status_code, and return a proper ObjectStorageError. Also keep the latency observation consistent for both success and failure paths.

Apply this diff:

-            Ok(resp) => {
-                let body: Bytes = resp.bytes().await.unwrap();
-                STORAGE_REQUEST_RESPONSE_TIME
-                    .with_label_values(&["s3", "GET", "200"])
-                    .observe(elapsed);
-                Ok(body)
-            }
+            Ok(resp) => {
+                match resp.bytes().await {
+                    Ok(body) => {
+                        STORAGE_REQUEST_RESPONSE_TIME
+                            .with_label_values(&["s3", "GET", "200"])
+                            .observe(elapsed);
+                        Ok(body)
+                    }
+                    Err(err) => {
+                        let status_code = error_to_status_code(&err);
+                        STORAGE_REQUEST_RESPONSE_TIME
+                            .with_label_values(&["s3", "GET", status_code])
+                            .observe(elapsed);
+                        Err(err.into())
+                    }
+                }
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match resp {
Ok(resp) => {
let time = instant.elapsed().as_secs_f64();
REQUEST_RESPONSE_TIME
.with_label_values(&["GET", "200"])
.observe(time);
let body = resp.bytes().await.unwrap();
let body: Bytes = resp.bytes().await.unwrap();
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", "200"])
.observe(elapsed);
Ok(body)
}
Err(err) => {
let time = instant.elapsed().as_secs_f64();
REQUEST_RESPONSE_TIME
.with_label_values(&["GET", "400"])
.observe(time);
let status_code = error_to_status_code(&err);
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", status_code])
.observe(elapsed);
Err(err.into())
}
}
match resp {
Ok(resp) => {
match resp.bytes().await {
Ok(body) => {
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", "200"])
.observe(elapsed);
Ok(body)
}
Err(err) => {
let status_code = error_to_status_code(&err);
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", status_code])
.observe(elapsed);
Err(err.into())
}
}
}
Err(err) => {
let status_code = error_to_status_code(&err);
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", status_code])
.observe(elapsed);
Err(err.into())
}
}
🤖 Prompt for AI Agents
In src/storage/s3.rs around lines 356 to 371, resp.bytes().await.unwrap() may
panic and incorrectly label the request as "200" even when reading the body
fails; change the flow to await resp.bytes().await and match its Result so that
body-read errors are handled: on Ok(body) record latency with label
["s3","GET","200"] and return the body, on Err(e) map the error to a status
string using error_to_status_code(&e), observe the latency with that status,
convert the bytes-read error into the appropriate ObjectStorageError (or map
into the function's error type) and return it instead of panicking; keep the
original resp error handling but ensure both response- and body-read-failure
paths use the same latency observation logic and correct status labeling.

Comment on lines +936 to +944
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", "200"])
.observe(list_start.elapsed().as_secs_f64());
STORAGE_FILES_SCANNED
.with_label_values(&["s3", "GET"])
.inc();
STORAGE_FILES_SCANNED_DATE
.with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()])
.inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Remove incorrect GET metrics in get_objects() (double count + wrong latency source)

This block double-instruments GETs already covered inside _get_object() and uses list_start (LIST latency) for the GET histogram, skewing the data. Rely on _get_object() for GET metrics and keep LIST metrics scoped to the listing.

Apply this diff to drop the duplicate/incorrect GET metrics:

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["s3", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["s3", "GET"])
-                .inc();
-            STORAGE_FILES_SCANNED_DATE
-                .with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()])
-                .inc();

To confirm there are no remaining duplicate GET histograms under "s3", run:


🏁 Script executed:

#!/bin/bash
# Find all GET histogram observations under the s3 provider
rg -nP --type=rust -C2 'STORAGE_REQUEST_RESPONSE_TIME.*\["s3",\s*"GET"'

Length of output: 73


🏁 Script executed:

#!/bin/bash
# Display lines around the GET instrumentation in get_objects (approx. lines 936-944)
sed -n '900,960p' src/storage/s3.rs || echo 'Cannot read src/storage/s3.rs'

echo "\n--- Locate _get_object definition and its GET metrics ---"
rg -nP 'fn\s+_get_object' -C5 src/storage/s3.rs || echo 'fn _get_object not found'
rg -nP 'STORAGE_.*with_label_values.*"s3",\s*"GET"' -C3 src/storage/s3.rs || echo 'No GET metrics found in _get_object or elsewhere'

Length of output: 2969


🏁 Script executed:

#!/bin/bash
# Display complete _get_object implementation to verify GET metrics
sed -n '330,420p' src/storage/s3.rs || echo "Failed to read src/storage/s3.rs"

echo -e "\n--- All occurrences of GET labels in s3.rs ---"
rg -n '\["s3",\s*"GET"' -C2 src/storage/s3.rs || echo "No GET label series found"

Length of output: 4786


Remove duplicate GET metrics in get_objects()

The instrumentation block at src/storage/s3.rs:936–944 duplicates the GET metrics already emitted by _get_object(), resulting in double-counted operations and skewed latency values. Please drop this snippet and rely on the existing _get_object() instrumentation to capture GET metrics.

• File: src/storage/s3.rs
– Lines 936–944 (inside get_objects())
• Reference: _get_object() at lines 346–365 already records
STORAGE_FILES_SCANNED / STORAGE_FILES_SCANNED_DATE for GET
STORAGE_REQUEST_RESPONSE_TIME for GET (success and error)

Apply this diff to remove the duplicate GET metrics:

-            STORAGE_REQUEST_RESPONSE_TIME
-                .with_label_values(&["s3", "GET", "200"])
-                .observe(list_start.elapsed().as_secs_f64());
-            STORAGE_FILES_SCANNED
-                .with_label_values(&["s3", "GET"])
-                .inc();
-            STORAGE_FILES_SCANNED_DATE
-                .with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()])
-                .inc();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["s3", "GET", "200"])
.observe(list_start.elapsed().as_secs_f64());
STORAGE_FILES_SCANNED
.with_label_values(&["s3", "GET"])
.inc();
STORAGE_FILES_SCANNED_DATE
.with_label_values(&["s3", "GET", &Utc::now().date_naive().to_string()])
.inc();
🤖 Prompt for AI Agents
In src/storage/s3.rs around lines 936 to 944, remove the instrumentation snippet
that emits STORAGE_REQUEST_RESPONSE_TIME, STORAGE_FILES_SCANNED, and
STORAGE_FILES_SCANNED_DATE for "s3"/"GET" inside get_objects(); these metrics
are already recorded in _get_object() (lines ~346–365) and must not be
duplicated — delete that block and ensure no other duplicate GET metric
emissions remain in get_objects().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant