Skip to content

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Aug 11, 2025

create stream with custom time partition
use header -
X-P-Time-Partition: - this has to be a timestamp field X-P-Time-Partition-Limit: max historical range default to 30d

server validates if all events in the batch has this time partition field if false, it rejects the whole batch
Fixes: #1400

Summary by CodeRabbit

  • New Features

    • Centralized stream validation for ingestion and propagation of optional time-partition info across OTEL/HTTP paths.
    • Streams may use time-based partitioning with explicit retention; time_partition and custom_partition are enforced mutually exclusive.
  • Refactor

    • Partitioned, batched manifest updates with parallel bucketing; concurrent bounded uploads, improved snapshot handling, and optional stats.
    • Query execution simplified to per-table time-partition inference; JSON flattening unified for partitioned vs non-partitioned inputs.
    • Several ingestion/handler constants made public.
  • Bug Fixes

    • Prevented overwriting staging files by renaming preexisting files with unique prefixes.
  • Chores

    • Added parallelism dependency for partitioned processing.
    • Removed a legacy HTTP ingest handler.

Copy link
Contributor

coderabbitai bot commented Aug 11, 2025

Walkthrough

Batch-oriented, partition-aware ingestion, JSON flattening, concurrent staging upload, and catalog snapshot updates added per-day partition processing; query API simplified to per-table time-partition inference. Also exposes several handler constants and adds rayon dependency.

Changes

Cohort / File(s) Summary
Dependency
Cargo.toml
Added rayon = "1.8".
Catalog batching & partitioned snapshot
src/catalog/mod.rs
update_snapshot accepts Vec<manifest::File>, groups changes by daily partitions (uses Rayon), processes partitions to create/update manifests, and appends ManifestItems to snapshot; many helper functions added.
HTTP ingest & OTEL setup
src/handlers/http/ingest.rs
Centralized stream validation via validate_stream_for_ingestion; setup_otel_stream now returns Option<String> time_partition; calls to flatten_and_push_logs updated to pass new fifth parameter.
Ingest utilities
src/handlers/http/modal/utils/ingest_utils.rs
flatten_and_push_logs and push_logs gain time_partition: Option<String>; validate_stream_for_ingestion added; time_partition threaded through ingest paths and cloned for OTEL branches.
Handler constants visibility
src/handlers/mod.rs
Several header/flag constants made pub const (e.g., TIME_PARTITION_KEY, CUSTOM_PARTITION_KEY, UPDATE_STREAM_KEY, etc.).
Stream creation & validation
src/parseable/mod.rs
Enforces mutual exclusivity of time_partition and custom_partition; computes time_partition_in_days (NonZeroU32) from limit and passes partition + limit into create_stream.
Staging writer collision handling
src/parseable/staging/writer.rs
If target exists, rename existing file using an 8-char random alphanumeric prefix + date-portion to avoid overwrite; adds rand usage.
Arrow file grouping
src/parseable/streams.rs
Uses ulid::Ulid::new() for in-process arrow file grouping (removed node-id derivation); tests updated accordingly.
Field stats logging & calls
src/storage/field_stats.rs
Downgraded some warn! logs to trace!; updated flatten/push calls to pass new optional time_partition param.
Concurrent upload & manifest pipeline
src/storage/object_storage.rs
Introduced bounded concurrent upload pipeline (JoinSet + semaphore), UploadContext/UploadResult, per-file upload tasks producing manifests and optional stats, aggregated snapshot updates, and schema commits.
JSON flattening (partition-aware)
src/utils/json/mod.rs
Added partition-aware flattening helpers; convert_array_to_object refactored to produce partitioned results when partitioning enabled and to return a single-batch Vec for non-partitioned inputs; tests added.
Query API & per-table time inference
src/query/*, src/handlers/*
Top-level execute and Query::execute signatures drop stream_name/time_partition; transform now infers per-table time_partition from PARSEABLE and inserts per-table filters; handlers updated to accept Vec<String> table names and call new execute(query, is_streaming).
Stream schema provider
src/query/stream_schema_provider.rs
Removed early guard that errored on scans without time bounds so unbounded scans proceed to staging/snapshot planning.
Alerts / execute call sites
src/alerts/alerts_utils.rs, src/handlers/airplane.rs, src/handlers/http/query.rs
Call sites updated to new execute(query, is_streaming) signature; handlers adjusted to derive first table from Vec<String> where needed.
Removed handler
src/handlers/http/modal/ingest/ingestor_ingest.rs
Deleted the previous per-logstream public post_event handler (ingestor_ingest).

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant HTTP_Ingest
  participant IngestUtils
  participant JSONUtils
  participant Staging
  Client->>HTTP_Ingest: POST /ingest (body, headers)
  HTTP_Ingest->>IngestUtils: validate_stream_for_ingestion(stream_name)
  alt invalid
    IngestUtils-->>Client: PostError
  else
    HTTP_Ingest->>IngestUtils: flatten_and_push_logs(body,..., time_partition?)
    IngestUtils->>JSONUtils: convert_array_to_object(body, time_partition, ...)
    JSONUtils-->>IngestUtils: Vec<Value> (partitioned batches or single batch)
    IngestUtils->>Staging: write staged files
    Staging-->>Client: 200 OK
  end
Loading
sequenceDiagram
  participant Staging
  participant ObjStorage
  participant WorkerPool
  participant StorageBackend
  participant Catalog
  participant Stats
  Staging->>ObjStorage: process_parquet_files(stream, files)
  ObjStorage->>WorkerPool: spawn upload tasks (bounded semaphore)
  WorkerPool->>StorageBackend: upload(file)
  WorkerPool->>Catalog: build manifest(file)
  WorkerPool->>Stats: calculate dataset stats (optional)
  WorkerPool-->>ObjStorage: UploadResult(manifest?, stats_flag)
  ObjStorage->>Catalog: update_snapshot(manifests: Vec<File>)
  ObjStorage->>Stats: handle_stats_sync(if any)
  ObjStorage-->>Staging: cleanup staged files
Loading
sequenceDiagram
  participant Producer
  participant Catalog
  participant Partitioner
  Producer->>Catalog: update_snapshot(changes: Vec<File>)
  Catalog->>Partitioner: group_changes_by_partition (parallel)
  Partitioner-->>Catalog: map(partition -> Vec<File>)
  loop per partition
    Catalog->>Catalog: process_single_partition(files) -> manifest_item?
  end
  Catalog-->>Producer: Ok
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Assessment against linked issues

Objective Addressed Explanation
Allow changing default time partition field to query historical data [#1400] The patch infers per-table time_partition from stream config but does not provide a mechanism to select or override the default time column (e.g., _ingested) for query planning or expose that choice via API/UI.

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
Concurrent upload pipeline and UploadContext/UploadResult (src/storage/object_storage.rs) Upload concurrency and manifest/statistics pipeline targets staging/upload behavior; unrelated to selecting query time-field.
Staging writer rename-on-collision (src/parseable/staging/writer.rs) Filesystem renaming to avoid overwrite is an engineering safeguard, not connected to time-field selection for queries.
ULID-based arrow file grouping (src/parseable/streams.rs) Switching to ULID for grouping affects internal file naming/grouping only, not query time-partition selection.

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nitisht
  • parmesant

Poem

"I hop through crates on rayon feet,
I stitch each manifest so neat.
I flatten JSON, partition by day,
Ulid seeds show arrowed way.
Hooray—uploads hum, the snapshots play 🐇"

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 64eb704 and f97f6f9.

📒 Files selected for processing (1)
  • src/handlers/http/modal/ingest/ingestor_ingest.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • src/handlers/http/modal/ingest/ingestor_ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

create stream with custom time partition
use header -
X-P-Time-Partition: <field-name> - this has to be a timestamp field
X-P-Time-Partition-Limit: max historical range default to 30d

server validates if all events in the batch has this time partition field
if false, it rejects the whole batch
@nikhilsinhaparseable nikhilsinhaparseable marked this pull request as ready for review August 18, 2025 04:04
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (4)
src/handlers/http/ingest.rs (1)

256-270: OTEL ingestion should also pass effective time_partition

To uniformly enforce time-partition validation for OTEL, compute the same effective time-partition (header override or stream default) and pass it to flatten_and_push_logs.

-                flatten_and_push_logs(
-                    serde_json::from_slice(&body)?,
-                    stream_name,
-                    log_source,
-                    &p_custom_fields,
-                    None,
-                )
+                let time_partition = req
+                    .headers()
+                    .get("x-p-time-partition")
+                    .and_then(|h| h.to_str().ok())
+                    .map(|s| s.to_string())
+                    .or_else(|| PARSEABLE.get_stream(stream_name).ok()?.get_time_partition());
+
+                flatten_and_push_logs(
+                    serde_json::from_slice(&body)?,
+                    stream_name,
+                    log_source,
+                    &p_custom_fields,
+                    time_partition,
+                )
                 .await?;
src/handlers/http/modal/utils/ingest_utils.rs (1)

133-139: Regression: stream-configured time_partition no longer enforced when caller passes None

push_logs now only uses the passed time_partition and no longer derives it from the stream. If call sites forget to pass it (several currently pass None), time-partition validation silently won’t run. To preserve correctness:

  • Compute an effective time partition: per-request override if present, else stream-configured.
  • Use that for both convert_array_to_object and into_event.
 pub async fn push_logs(
     stream_name: &str,
     json: Value,
     log_source: &LogSource,
     p_custom_fields: &HashMap<String, String>,
-    time_partition: Option<String>,
+    time_partition: Option<String>,
 ) -> Result<(), PostError> {
     let stream = PARSEABLE.get_stream(stream_name)?;
     let time_partition_limit = PARSEABLE
         .get_stream(stream_name)?
         .get_time_partition_limit();
     let static_schema_flag = stream.get_static_schema_flag();
     let custom_partition = stream.get_custom_partition();
     let schema_version = stream.get_schema_version();
     let p_timestamp = Utc::now();
 
-    let data = convert_array_to_object(
+    // Prefer per-request override, else fall back to the stream-configured time partition
+    let effective_time_partition = time_partition.or_else(|| stream.get_time_partition());
+
+    let data = convert_array_to_object(
         json,
-        time_partition.as_ref(),
+        effective_time_partition.as_ref(),
         time_partition_limit,
         custom_partition.as_ref(),
         schema_version,
         log_source,
     )?;
 
     for json in data {
         let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
         let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw();
         json::Event { json, p_timestamp }
             .into_event(
                 stream_name.to_owned(),
                 origin_size,
                 &schema,
                 static_schema_flag,
                 custom_partition.as_ref(),
-                time_partition.as_ref(),
+                effective_time_partition.as_ref(),
                 schema_version,
                 StreamType::UserDefined,
                 p_custom_fields,
             )?
             .process()?;
     }
     Ok(())
 }

Also applies to: 149-156, 168-169

src/catalog/mod.rs (1)

90-109: Unsafe unwraps in get_file_bounds can crash the node

find(...).unwrap() and stats.as_ref().unwrap() will panic if:

  • the expected column is missing in a file, or
  • stats are absent (e.g., file with schema/metadata drift).

Prefer returning a handled error or skipping such files with a log. Minimal fix:

-fn get_file_bounds(
-    file: &manifest::File,
-    partition_column: String,
-) -> (DateTime<Utc>, DateTime<Utc>) {
-    match file
-        .columns()
-        .iter()
-        .find(|col| col.name == partition_column)
-        .unwrap()
-        .stats
-        .as_ref()
-        .unwrap()
-    {
+fn get_file_bounds(
+    file: &manifest::File,
+    partition_column: String,
+) -> (DateTime<Utc>, DateTime<Utc>) {
+    let col = match file.columns().iter().find(|col| col.name == partition_column) {
+        Some(c) => c,
+        None => {
+            error!("Partition column {partition_column} not found in manifest file {}", file.file_path);
+            // Fallback: treat as empty range to avoid crash; adjust if a different strategy is preferred
+            return (DateTime::from_timestamp_millis(0).unwrap(), DateTime::from_timestamp_millis(0).unwrap());
+        }
+    };
+    let stats = match col.stats.as_ref() {
+        Some(s) => s,
+        None => {
+            error!("Missing stats for column {partition_column} in {}", file.file_path);
+            return (DateTime::from_timestamp_millis(0).unwrap(), DateTime::from_timestamp_millis(0).unwrap());
+        }
+    };
+    match stats {
         column::TypedStatistics::Int(stats) => (
             DateTime::from_timestamp_millis(stats.min).unwrap(),
             DateTime::from_timestamp_millis(stats.max).unwrap(),
         ),
         _ => unreachable!(),
     }
 }

If a hard error is preferred instead of a fallback, we can return a Result and propagate it.

src/utils/json/mod.rs (1)

195-217: Document wrapping of array inputs in convert_array_to_object

Call sites and tests confirm that when body is a JSON array (and no partitioning), the entire array is returned as a single Vec<Value>. Please update the docstring on src/utils/json/mod.rs (around the convert_array_to_object signature, ~line 219) to explicitly state:

  • For array inputs without partitioning, the function wraps the full array into a one-element Vec.

[src/utils/json/mod.rs:219]

🧹 Nitpick comments (14)
src/parseable/streams.rs (1)

1368-1371: Avoid println in tests; assert update is correct.

The updated assertion to expect 3 Arrow files is correct for the same-minute case before compaction. Replace the println with tracing to keep tests clean.

Apply this diff:

-        println!("arrow files: {:?}", staging.arrow_files());
+        tracing::debug!("arrow files: {:?}", staging.arrow_files());
src/parseable/mod.rs (1)

922-931: Custom partition validation limits to a single key.

Matches the documented constraint and simplifies downstream layout. Consider updating the error message to include the provided value for quicker debugging.

Apply this diff:

-        return Err(CreateStreamError::Custom {
-            msg: "Maximum 1 custom partition key is supported".to_string(),
+        return Err(CreateStreamError::Custom {
+            msg: format!("Maximum 1 custom partition key is supported (got: {custom_partition})"),
             status: StatusCode::BAD_REQUEST,
         });
src/storage/field_stats.rs (3)

180-181: Lowering log level to trace may hide actionable failures

These failures previously surfaced at warn!. Moving to trace! will make them easy to miss in production. Consider debug! with field context (stream/field) so operators can correlate silent drops.

- trace!("Failed to execute distinct stats query: {e}");
+ debug!("Failed to execute distinct stats query: {e}");

188-189: Same concern: batch fetch errors downgraded to trace

If the stream is noisy, this will fly under the radar. Suggest debug! and include stream_name/field_name to aid troubleshooting.

- trace!("Failed to fetch batch in distinct stats query: {e}");
+ debug!("Failed to fetch batch in distinct stats query: {e}");

216-217: Field-specific query failure should not be fully silent

trace! here likely hides intermittent schema/data issues. Recommend debug! (or keep warn!) to retain visibility.

- trace!("Failed to execute distinct stats query for field: {field_name}, error: {e}");
+ debug!("Failed to execute distinct stats query for field: {field_name}, error: {e}");
src/handlers/http/ingest.rs (1)

515-545: HTTP status for CustomError

PostError::CustomError maps to 500. For client-side misuse (e.g., bad header combo), 400 is more appropriate. Consider a dedicated variant (e.g., TimePartitionIngestionError) mapped to 400 instead of reusing CustomError.

src/catalog/mod.rs (1)

130-158: Parallel grouping is fine; consider determinism only if needed

Using Rayon to group by partition is fine. If deterministic processing order ever matters, add a final sort of keys before iteration. Not required right now.

src/storage/object_storage.rs (2)

972-999: Minor: spawn_parquet_upload_task doesn’t need to be async

This function only schedules a task; making it fn avoids an unnecessary .await.

-async fn spawn_parquet_upload_task(
+fn spawn_parquet_upload_task(
     join_set: &mut JoinSet<Result<UploadResult, ObjectStorageError>>,
@@
-) {
+) {
@@
-    join_set.spawn(async move {
+    join_set.spawn(async move {
         let _permit = semaphore.acquire().await.expect("semaphore is not closed");
 
         upload_single_parquet_file(store, path, stream_relative_path, stream_name, schema).await
     });
-}
+}

1001-1044: Result collection strategy is fine; consider best-effort behavior on single-file errors

Currently, the first per-file error aborts the entire sync with an error, potentially leaving other tasks running until JoinSet is dropped. If you prefer best-effort uploads, collect all successes and log failures, then proceed to snapshot update. Optional.

src/utils/json/mod.rs (5)

66-75: De-duplicate gating logic: reuse this helper in flatten_json_body for consistency

You’ve introduced should_apply_generic_flattening() but flatten_json_body re-implements the same condition inline. Using the helper there avoids drift and keeps behavior consistent across partitioned and non-partitioned paths.

Outside this hunk, consider updating flatten_json_body as:

let mut nested_value = if should_apply_generic_flattening(&body, schema_version, log_source) {
    let flattened_json = generic_flattening(&body)?;
    convert_to_array(flattened_json)?
} else {
    body
};

77-115: Simplify: always iterate the flattened results (remove special-casing len == 1)

Both branches call flatten::flatten on each resulting item; the only difference is avoiding a tiny loop for len == 1. Unify to reduce code and potential divergence.

Apply this diff:

 fn apply_generic_flattening_for_partition(
     element: Value,
     time_partition: Option<&String>,
     time_partition_limit: Option<NonZeroU32>,
     custom_partition: Option<&String>,
 ) -> Result<Vec<Value>, anyhow::Error> {
-    let flattened_json = generic_flattening(&element)?;
-
-    if flattened_json.len() == 1 {
-        // Single result - process normally
-        let mut nested_value = flattened_json.into_iter().next().unwrap();
-        flatten::flatten(
-            &mut nested_value,
-            "_",
-            time_partition,
-            time_partition_limit,
-            custom_partition,
-            true,
-        )?;
-        Ok(vec![nested_value])
-    } else {
-        // Multiple results - process each individually
-        let mut result = Vec::new();
-        for item in flattened_json {
-            let mut processed_item = item;
-            flatten::flatten(
-                &mut processed_item,
-                "_",
-                time_partition,
-                time_partition_limit,
-                custom_partition,
-                true,
-            )?;
-            result.push(processed_item);
-        }
-        Ok(result)
-    }
+    let flattened_json = generic_flattening(&element)?;
+    let mut result = Vec::with_capacity(flattened_json.len());
+    for mut item in flattened_json {
+        flatten::flatten(
+            &mut item,
+            "_",
+            time_partition,
+            time_partition_limit,
+            custom_partition,
+            true,
+        )?;
+        result.push(item);
+    }
+    Ok(result)
 }

149-171: Pre-allocate results; consider parallelizing for large batches

The loop is fine. Two small options:

  • Pre-allocate: result.reserve(arr.len()) as a lower bound.
  • Optional future: use rayon to parallelize per-element processing if CPU-bound and safe (PR mentions rayon was added elsewhere).

Minimal pre-allocation diff:

 fn process_partitioned_array(
@@
 ) -> Result<Vec<Value>, anyhow::Error> {
-    let mut result = Vec::new();
+    let mut result = Vec::new();
+    result.reserve(arr.len()); // lower bound; items can expand

If/when you choose to parallelize, you can adopt rayon and then flatten:

// Outside this hunk, add once: use rayon::prelude::*;
let chunks: Result<Vec<Vec<Value>>, _> = arr
    .into_par_iter()
    .map(|element| {
        process_partitioned_element(
            element,
            time_partition,
            time_partition_limit,
            custom_partition,
            schema_version,
            log_source,
        )
    })
    .collect();

let result = chunks?.into_iter().flatten().collect();

483-515: Add a negative test: missing time-partition field should reject the whole batch

Given the PR objective (reject batch if any event lacks the partition field), add a test where one element omits source_time and assert that convert_array_to_object returns Err.

Add alongside this test:

#[test]
fn test_convert_array_to_object_with_time_partition_missing_field_rejects_batch() {
    let json = json!([
        { "a": "b", "source_time": "2025-08-01T00:00:00.000Z" },
        { "a": "b" } // missing required partition field
    ]);

    let time_partition = Some("source_time".to_string());
    let result = convert_array_to_object(
        json,
        time_partition.as_ref(),
        None,
        None,
        SchemaVersion::V0,
        &crate::event::format::LogSource::default(),
    );

    assert!(result.is_err(), "Batch should be rejected if any event lacks time partition field");
}

517-545: Consider adding a SchemaVersion::V1 case to exercise generic flattening gating

This test validates the new non-partitioned batching, but only for V0. A small V1 case with shallow nesting would exercise should_apply_generic_flattening and guard against regressions in the gating condition.

Example to add near this test:

#[test]
fn test_non_partitioned_v1_generic_flattening_applies() {
    // Single object with an array to be expanded by generic_flattening
    let json = json!({
        "id": 1,
        "items": [ {"x": 10}, {"x": 20} ]
    });

    let result = convert_array_to_object(
        json.clone(),
        None,
        None,
        None,
        SchemaVersion::V1,
        &crate::event::format::LogSource::Json, // matches gating
    );

    assert!(result.is_ok());
    let out = result.unwrap();
    assert_eq!(out.len(), 1, "Non-partitioned path should still return a single batch item");
    // And the single batch item should be an array with expanded objects
    // depending on flatten_json_body's V1 behavior.
    // You can assert shape here based on expected expansion semantics.
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 08bece6 and 95aacf6.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (11)
  • Cargo.toml (1 hunks)
  • src/catalog/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (3 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (4 hunks)
  • src/handlers/mod.rs (1 hunks)
  • src/parseable/mod.rs (2 hunks)
  • src/parseable/staging/writer.rs (2 hunks)
  • src/parseable/streams.rs (2 hunks)
  • src/storage/field_stats.rs (4 hunks)
  • src/storage/object_storage.rs (5 hunks)
  • src/utils/json/mod.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-06-16T09:50:38.636Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1346
File: src/parseable/streams.rs:319-331
Timestamp: 2025-06-16T09:50:38.636Z
Learning: In Parseable's Ingest or Query mode, the node_id is always available because it's generated during server initialization itself, before the get_node_id_string() function in streams.rs would be called. This makes the .expect() calls on QUERIER_META.get() and INGESTOR_META.get() safe in this context.

Applied to files:

  • src/parseable/streams.rs
🧬 Code Graph Analysis (6)
src/utils/json/mod.rs (4)
src/utils/json/flatten.rs (4)
  • has_more_than_max_allowed_levels (335-348)
  • custom_partition (104-104)
  • generic_flattening (269-328)
  • flatten (58-93)
src/parseable/mod.rs (3)
  • custom_partition (751-751)
  • custom_partition (923-923)
  • new (146-158)
src/storage/mod.rs (3)
  • new (192-194)
  • new (205-211)
  • default (215-236)
src/event/format/mod.rs (1)
  • new (126-131)
src/storage/object_storage.rs (5)
src/parseable/streams.rs (1)
  • new (114-131)
src/parseable/mod.rs (3)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/catalog/manifest.rs (1)
  • create_from_parquet_file (91-124)
src/storage/field_stats.rs (1)
  • calculate_field_stats (79-130)
src/catalog/mod.rs (1)
  • update_snapshot (111-128)
src/handlers/http/modal/utils/ingest_utils.rs (2)
src/utils/json/mod.rs (1)
  • convert_array_to_object (219-256)
src/static_schema.rs (1)
  • custom_partition (73-73)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (1)
  • flatten_and_push_logs (51-131)
src/parseable/mod.rs (1)
src/static_schema.rs (1)
  • custom_partition (73-73)
src/catalog/mod.rs (3)
src/storage/object_storage.rs (2)
  • new (83-92)
  • manifest_path (1216-1236)
src/stats.rs (4)
  • event_labels (218-220)
  • event_labels_date (226-232)
  • storage_size_labels_date (234-236)
  • get_current_stats (51-109)
src/catalog/manifest.rs (1)
  • default (69-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (16)
Cargo.toml (1)

126-126: Rayon dependency addition looks good.

No immediate compatibility or security concerns. Keep an eye on binary size and parallelism defaults where you adopt Rayon to avoid oversubscription (Tokio + Rayon).

If you intend to gate Rayon-backed paths, consider making it an optional feature to avoid pulling it into minimal deployments. Want me to draft a feature-gated dependency stanza?

src/parseable/streams.rs (1)

276-276: ULID-based grouping token: sensible choice for collision-free parquet names.

Using a per-run ULID avoids cross-run collisions and keeps grouping deterministic for a single pass.

src/parseable/mod.rs (4)

538-544: Mutual exclusivity between time and custom partitions: good guardrail.

This aligns with the new semantics and avoids ambiguous layouts. Keep it.

Confirm that the ingestion paths reject payloads that contain both fields when stream metadata already sets one (including updates). If needed, I can scan the codebase and propose harmonized checks at ingestion boundaries.


871-900: Static schema validation path is correct.

Passing time_partition and custom_partition through to schema conversion ensures early validation of field presence for static schema streams.


902-920: Time-partition limit parsing covers format and zero checks.

Error messages are clear and actionable. No further changes needed.


531-576: Double-check end-to-end enforcement that every event carries the specified time-partition field.

The PR states batches should be rejected if any event lacks the specified time-partition field. This file handles metadata and creation, but not the ingest-time validation.

Would you like me to verify and, if needed, add a guard in the ingest pipeline (e.g., in flatten_and_push_logs/push_logs) to enforce this invariant at batch boundaries?

src/handlers/mod.rs (1)

29-35: Making header constants public is appropriate.

This enables cross-module access to time-partition and related headers without duplication. Keep the constants centralized here.

src/storage/field_stats.rs (1)

126-127: Signature sync with new time_partition param looks correct

Passing None into flatten_and_push_logs aligns with the updated signature and preserves the previous behavior for dataset-stats.

src/handlers/http/ingest.rs (3)

118-133: Good: explicit stream retrieval and error propagation

Matching on PARSEABLE.get_stream and propagating failures removes the prior ambiguity. The OTEL format compatibility check is preserved.


410-425: Same issue in post_event: hard block + not passing time_partition

Mirror the fix applied in ingest: remove the block and pass effective time partition to flatten_and_push_logs.

-    if stream.get_time_partition().is_some() {
-        return Err(PostError::CustomError(
-            "Ingestion is not allowed to stream with time partition".to_string(),
-        ));
-    }
-
-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    let time_partition = req
+        .headers()
+        .get("x-p-time-partition")
+        .and_then(|h| h.to_str().ok())
+        .map(|s| s.to_string())
+        .or_else(|| stream.get_time_partition());
+
+    flatten_and_push_logs(
+        json,
+        &stream_name,
+        &log_source,
+        &p_custom_fields,
+        time_partition,
+    )
+    .await?;
⛔ Skipped due to learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

55-67: Time-partition headers are already handled at stream creation
The ingest handler pulls the stream’s time_partition and time_partition_limit from its metadata (via PARSEABLE.get_stream…get_time_partition_limit()) and passes them into flatten_and_push_logs for validation. The raw X-P-Time-Partition and X-P-Time-Partition-Limit headers are parsed when the stream is created or updated in

  • src/handlers/http/modal/utils/logstream_utils.rs (see TIME_PARTITION_KEY / TIME_PARTITION_LIMIT_KEY in PutStreamHeaders)

No changes to the ingest endpoint are required.

src/handlers/http/modal/utils/ingest_utils.rs (1)

51-57: Public API extension is sensible

Adding time_partition: Option<String> to flatten_and_push_logs keeps backward compatibility and enables per-request overrides.

src/catalog/mod.rs (1)

111-128: Batch-oriented snapshot update: good structure and early return

Switching to a Vec of changes and early exit is clean and unlocks per-partition processing.

src/storage/object_storage.rs (1)

75-93: UploadContext is a good encapsulation

The context wrapper simplifies passing stream-derived info across the pipeline.

src/utils/json/mod.rs (2)

126-145: Per-element generic-flatten gating may yield mixed shapes within a single batch

In partitioned mode, generic_flattening is decided per element, whereas the non-partitioned path gates on the whole body. This can produce heterogeneous outputs (some events expanded via generic flattening while others aren’t) within the same batch if depth varies across elements. If downstream expects a uniform schema per batch, this can be surprising.

Please confirm that this behavioral change is intentional and acceptable for consumers. If uniformity is required, consider evaluating the gating once for the entire array and applying the same policy to all elements in that batch.


219-256: Routing logic LGTM

The partition/non-partition routing is clear, and validation is always enabled in the partitioned path. This aligns with rejecting the entire batch when a single event violates partition requirements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
src/query/mod.rs (3)

582-615: Always enforce server time bounds per table; don’t skip just because “some” time filter exists

Right now, if any time filter exists on the scan, no server-side time window is added. This can miss adding the missing bound (e.g., only >= start is present, but < end is not), causing larger-than-necessary scans. It’s safe and simpler to always apply the request’s [start, end) bounds; redundant predicates are benign and DF can still optimize.

Apply this diff to always add the filters and avoid the “any-time-filter” gate:

-                let mut new_filters = vec![];
-                if !table_contains_any_time_filters(&table, time_partition.as_ref()) {
-                    let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string();
-                    let time_column = time_partition.as_ref().unwrap_or(&default_timestamp);
-
-                    // Create time filters with table-qualified column names
-                    let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(
-                        start_time,
-                    ))
-                    .binary_expr(Expr::Column(Column::new(
-                        Some(table.table_name.to_owned()),
-                        time_column.clone(),
-                    )));
-
-                    let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(
-                        end_time,
-                    ))
-                    .binary_expr(Expr::Column(Column::new(
-                        Some(table.table_name.to_owned()),
-                        time_column.clone(),
-                    )));
-
-                    new_filters.push(start_time_filter);
-                    new_filters.push(end_time_filter);
-                }
+                // Always add the server-side time window to constrain scans.
+                let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string();
+                let time_column = time_partition.as_ref().unwrap_or(&default_timestamp);
+
+                let mut new_filters = vec![];
+                // Create time filters with table-qualified column names
+                let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
+                    .binary_expr(Expr::Column(Column::new(
+                        Some(table.table_name.to_owned()),
+                        time_column.clone(),
+                    )));
+
+                let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
+                    .binary_expr(Expr::Column(Column::new(
+                        Some(table.table_name.to_owned()),
+                        time_column.clone(),
+                    )));
+
+                new_filters.push(start_time_filter);
+                new_filters.push(end_time_filter);

619-623: Avoid unwrap on Filter::try_new to prevent panics; fall back gracefully

If a stream is misconfigured or the time column is absent, Filter::try_new can error. Panicking here crashes query execution. Return the scan unchanged on error.

-                    let filter =
-                        Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table)))
-                            .unwrap();
-                    Ok(Transformed::yes(LogicalPlan::Filter(filter)))
+                    match Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))) {
+                        Ok(filter) => Ok(Transformed::yes(LogicalPlan::Filter(filter))),
+                        Err(_) => Ok(Transformed::no(LogicalPlan::TableScan(table))),
+                    }

641-657: Detect time filters more robustly (check both sides and match the intended column)

Current check only inspects the left-hand side and compares just the column name. Improve robustness by checking both sides and still matching the time column name.

-    table
-        .filters
-        .iter()
-        .filter_map(|x| {
-            if let Expr::BinaryExpr(binexpr) = x {
-                Some(binexpr)
-            } else {
-                None
-            }
-        })
-        .any(|expr| {
-            matches!(&*expr.left, Expr::Column(Column { name, .. })
-            if name == time_column)
-        })
+    table
+        .filters
+        .iter()
+        .filter_map(|x| {
+            if let Expr::BinaryExpr(binexpr) = x {
+                Some(binexpr)
+            } else {
+                None
+            }
+        })
+        .any(|expr| {
+            match (&*expr.left, &*expr.right) {
+                (Expr::Column(Column { name, .. }), _) if name == time_column => true,
+                (_, Expr::Column(Column { name, .. })) if name == time_column => true,
+                _ => false,
+            }
+        })
src/handlers/airplane.rs (1)

138-154: Ensure streams are registered in Query/Prism modes before executing

Airplane path resolves table names but doesn’t create/attach streams in-memory like the HTTP path does. Given GlobalSchemaProvider.table() expects streams to exist, consider registering them (especially in Query/Prism) to avoid “stream not found” errors.

Proposed addition (outside the changed hunk) before building the query:

use crate::handlers::http::query::create_streams_for_distributed;

// After `let streams = resolve_stream_names(&ticket.query)?;`
if PARSEABLE.options.mode == crate::option::Mode::Query || PARSEABLE.options.mode == crate::option::Mode::Prism {
    if let Err(e) = create_streams_for_distributed(streams.clone()).await {
        return Err(Status::failed_precondition(format!("Failed to prepare streams: {e}")));
    }
}

This mirrors the pattern used in the HTTP handler and aligns with prior guidance to verify stream existence in both query and standalone modes.

src/handlers/http/query.rs (3)

207-216: Avoid potential panic on empty table list when labeling metrics

Indexing with [0] will panic if tables becomes empty upstream (e.g., an edge-case query with no table scan). Prefer safe access with a descriptive error.

-async fn handle_non_streaming_query(
-    query: LogicalQuery,
-    table_name: Vec<String>,
+async fn handle_non_streaming_query(
+    query: LogicalQuery,
+    table_name: Vec<String>,
     query_request: &Query,
     time: Instant,
 ) -> Result<HttpResponse, QueryError> {
-    let first_table_name = table_name[0].clone();
+    let first_table_name = table_name
+        .get(0)
+        .cloned()
+        .ok_or(QueryError::MalformedQuery("No table name found in query"))?;
     let (records, fields) = execute(query, query_request.streaming).await?;
@@
-    QUERY_EXECUTE_TIME
-        .with_label_values(&[&first_table_name])
-        .observe(time);
+    QUERY_EXECUTE_TIME
+        .with_label_values(&[&first_table_name])
+        .observe(time);

Also applies to: 228-230


260-266: Same here: safe first-table extraction for streaming path

Mirror the non-streaming fix to avoid panics and keep labeling robust.

-async fn handle_streaming_query(
-    query: LogicalQuery,
-    table_name: Vec<String>,
+async fn handle_streaming_query(
+    query: LogicalQuery,
+    table_name: Vec<String>,
     query_request: &Query,
     time: Instant,
 ) -> Result<HttpResponse, QueryError> {
-    let first_table_name = table_name[0].clone();
+    let first_table_name = table_name
+        .get(0)
+        .cloned()
+        .ok_or(QueryError::MalformedQuery("No table name found in query"))?;
     let (records_stream, fields) = execute(query, query_request.streaming).await?;
@@
-    QUERY_EXECUTE_TIME
-        .with_label_values(&[&first_table_name])
-        .observe(time);
+    QUERY_EXECUTE_TIME
+        .with_label_values(&[&first_table_name])
+        .observe(time);

Also applies to: 276-278


201-206: Docs drift: parameter now carries multiple tables

The doc comments still describe a single table_name, but the functions accept Vec. Update the docs to prevent confusion.

Also applies to: 250-258

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 95aacf6 and 7af280d.

📒 Files selected for processing (5)
  • src/alerts/alerts_utils.rs (1 hunks)
  • src/handlers/airplane.rs (1 hunks)
  • src/handlers/http/query.rs (6 hunks)
  • src/query/mod.rs (7 hunks)
  • src/query/stream_schema_provider.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • src/query/stream_schema_provider.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/query/mod.rs
🧬 Code Graph Analysis (4)
src/alerts/alerts_utils.rs (1)
src/query/mod.rs (2)
  • execute (78-92)
  • execute (179-212)
src/handlers/airplane.rs (1)
src/query/mod.rs (2)
  • execute (78-92)
  • execute (179-212)
src/query/mod.rs (4)
src/query/stream_schema_provider.rs (1)
  • table (87-100)
src/parseable/streams.rs (1)
  • new (114-131)
src/parseable/mod.rs (1)
  • new (146-158)
src/metadata.rs (1)
  • new (97-134)
src/handlers/http/query.rs (2)
src/query/mod.rs (2)
  • execute (78-92)
  • execute (179-212)
src/query/stream_schema_provider.rs (1)
  • table (87-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (4)
src/handlers/airplane.rs (1)

208-210: Signature update LGTM

Swapping to execute(query, false) matches the new API and keeps the downstream logic intact.

src/alerts/alerts_utils.rs (1)

110-113: Signature update LGTM

The switch to execute(query, false) aligns with the refactored public API. Upstream stream preparation is already handled earlier in this function.

src/handlers/http/query.rs (2)

97-99: Signature update LGTM

The change to execute(query, false) matches the new query API and the surrounding logic remains consistent.


128-132: Count path: safe first-table extraction LGTM

You guard the empty case with a proper error; extracting the first table name for counts is fine.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🔭 Outside diff range comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)

138-170: Regression: stream-configured time partition no longer takes effect; restore fallback and minor perf cleanups

push_logs used to derive time_partition from the stream. After this change, it only uses the passed-in value; with call sites passing None, time partition is silently ignored and validation never runs.

Fix:

  • Compute effective_time_partition = time_partition.or_else(|| stream.get_time_partition()).
  • Use stream for time_partition_limit and schema to avoid repeated lookups.
 pub async fn push_logs(
     stream_name: &str,
     json: Value,
     log_source: &LogSource,
     p_custom_fields: &HashMap<String, String>,
-    time_partition: Option<String>,
+    time_partition: Option<String>,
 ) -> Result<(), PostError> {
-    let stream = PARSEABLE.get_stream(stream_name)?;
-    let time_partition_limit = PARSEABLE
-        .get_stream(stream_name)?
-        .get_time_partition_limit();
+    let stream = PARSEABLE.get_stream(stream_name)?;
+    let time_partition_limit = stream.get_time_partition_limit();
     let static_schema_flag = stream.get_static_schema_flag();
     let custom_partition = stream.get_custom_partition();
     let schema_version = stream.get_schema_version();
     let p_timestamp = Utc::now();
 
-    let data = convert_array_to_object(
-        json,
-        time_partition.as_ref(),
-        time_partition_limit,
-        custom_partition.as_ref(),
-        schema_version,
-        log_source,
-    )?;
+    // Fallback to stream-configured time partition if caller did not supply one
+    let effective_time_partition = time_partition.or_else(|| stream.get_time_partition());
+
+    let data = convert_array_to_object(
+        json,
+        effective_time_partition.as_ref(),
+        time_partition_limit,
+        custom_partition.as_ref(),
+        schema_version,
+        log_source,
+    )?;
 
+    let schema = stream.get_schema_raw();
     for json in data {
         let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
-        let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw();
         json::Event { json, p_timestamp }
             .into_event(
                 stream_name.to_owned(),
                 origin_size,
                 &schema,
                 static_schema_flag,
                 custom_partition.as_ref(),
-                time_partition.as_ref(),
+                effective_time_partition.as_ref(),
                 schema_version,
                 StreamType::UserDefined,
                 p_custom_fields,
             )?
             .process()?;
     }
     Ok(())
 }

51-57: Action Required: Wire time_partition Across All flatten_and_push_logs Call-Sites

The new signature for flatten_and_push_logs takes five parameters—including p_custom_fields and an explicit time_partition—so we must ensure no call-site still defaults to None or omits arguments. Update the following locations:

src/handlers/http/ingest.rs

  • Line 133 and Line 408: currently calling
    flatten_and_push_logs(..., &p_custom_fields, None).await?;
    → Replace None with the actual time_partition value you derive.

src/handlers/http/modal/ingest/ingestor_ingest.rs

  • Line 41: calling with only three arguments (req, body, stream_name)
    → Expand to five arguments:
    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, time_partition).await?;

src/storage/field_stats.rs

  • Line 121: verify that you’re passing both a &HashMap<String, String> and the correct time_partition here—instead of relying on defaults.

The internal push_logs calls in src/handlers/http/modal/utils/ingest_utils.rs remain unaffected, as they do not accept a time_partition.

♻️ Duplicate comments (4)
src/handlers/http/ingest.rs (2)

127-134: Remove hard block and plumb an effective time_partition (header override → stream default) into flattening/push path

Blocking ingestion when a stream has a time partition contradicts the PR objective (enable custom time partitions) and disables validation of the required field at the right layer. Let flatten_and_push_logs/push_logs enforce “field present in every event” by providing the effective time_partition.

Also note: returning PostError::CustomError maps to 500; this is a client-side error and should be 400. Prefer a typed error variant that maps to BAD_REQUEST.

Apply the following changes:

  • Remove the hard block.
  • Compute time_partition from header x-p-time-partition with fallback to stream.get_time_partition().
  • Pass it to flatten_and_push_logs.
-    if stream.get_time_partition().is_some() {
-        return Err(PostError::CustomError(
-            "Ingestion is not allowed to stream with time partition".to_string(),
-        ));
-    }
-
-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    // Prefer header override, else fall back to stream-configured time partition
+    let time_partition = req
+        .headers()
+        .get("x-p-time-partition")
+        .and_then(|h| h.to_str().ok())
+        .map(|s| s.to_string())
+        .or_else(|| stream.get_time_partition());
+
+    flatten_and_push_logs(
+        json,
+        &stream_name,
+        &log_source,
+        &p_custom_fields,
+        time_partition,
+    )
+    .await?;

400-409: Same issue as ingest(): remove hard block and pass effective time_partition

This block prevents the feature altogether on this endpoint and returns a 500 via CustomError. Remove the guard, derive the effective time partition from header → stream default, and pass it to flatten_and_push_logs.

-    let stream = validate_stream_for_ingestion(&stream_name)?;
-
-    if stream.get_time_partition().is_some() {
-        return Err(PostError::CustomError(
-            "Ingestion is not allowed to stream with time partition".to_string(),
-        ));
-    }
-
-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    let stream = validate_stream_for_ingestion(&stream_name)?;
+
+    // Prefer header override, else fall back to stream-configured time partition
+    let time_partition = req
+        .headers()
+        .get("x-p-time-partition")
+        .and_then(|h| h.to_str().ok())
+        .map(|s| s.to_string())
+        .or_else(|| stream.get_time_partition());
+
+    flatten_and_push_logs(
+        json,
+        &stream_name,
+        &log_source,
+        &p_custom_fields,
+        time_partition,
+    )
+    .await?;
src/storage/object_storage.rs (1)

145-166: Fix brittle date extraction that can panic.

The current implementation uses unchecked array indexing which will panic if the filename format deviates from expectations. This is a known issue from previous reviews.

Apply this diff to handle malformed filenames gracefully:

-fn update_storage_metrics(
-    path: &std::path::Path,
-    stream_name: &str,
-    filename: &str,
-) -> Result<(), ObjectStorageError> {
-    let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0];
-    file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1];
+fn update_storage_metrics(
+    path: &std::path::Path,
+    stream_name: &str,
+    filename: &str,
+) -> Result<(), ObjectStorageError> {
+    // Extract date from filename format: "date=YYYY-MM-DD.suffix"
+    let file_date_part = filename
+        .split('.')
+        .next()
+        .and_then(|part| part.split('=').nth(1))
+        .ok_or_else(|| {
+            ObjectStorageError::Custom(format!("Invalid filename format: {}", filename))
+        })?;
src/catalog/mod.rs (1)

341-354: Wrong meta used when creating a manifest for non-matching partition.

When the manifest path doesn't match the current node's pattern (line 341-354), the code uses ObjectStoreFormat::default() instead of the actual stream metadata. This loses stream-specific settings like time_partition.

Apply this diff to use the correct metadata:

     } else {
         // Create new manifest for different partition
         create_manifest(
             partition_lower,
             partition_changes,
             storage,
             stream_name,
             false,
-            ObjectStoreFormat::default(),
+            meta.clone(),
             events_ingested,
             ingestion_size,
             storage_size,
         )
         .await
     }
🧹 Nitpick comments (7)
src/handlers/http/ingest.rs (3)

121-126: Avoid discarding the returned Stream; reuse it to derive effective time partition

You fetch stream via validate_stream_for_ingestion but don’t use it afterward. It’s useful to:

  • Avoid an extra lookup later.
  • Compute the effective time partition (header override or stream default) once and pass it downstream.

This also helps eliminate subsequent duplicate checks.


251-275: OTEL endpoints: align validation/gating strategy across all ingestion paths

Currently:

  • ingest()/post_event block time-partitioned streams via validation helper + explicit guard.
  • OTEL endpoints don’t apply the same gating and pass None for partition.

Decide one consistent policy:

  • If the feature is enabled (per this PR), remove blocks and rely on downstream validation once time_partition is provided.
  • If it’s gated (enterprise-only), enforce the same check in OTEL paths too (ideally centralized), and return a 4xx.

I can draft a follow-up to centralize this check.

Also applies to: 287-297, 315-337


52-92: Missing support for X-P-Time-Partition-Limit header

PR description includes X-P-Time-Partition-Limit, but no code reads or forwards it; push_logs always uses the stream-configured limit. If per-request override is desired:

  • Parse the header here.
  • Plumb an optional limit through to push_logs and down to convert_array_to_object.

I can provide a patch that threads Option<NonZeroU32> through the APIs with minimal churn.

src/handlers/http/modal/utils/ingest_utils.rs (2)

47-49: Exclude control headers from custom fields to avoid polluting the dataset

Headers like x-p-time-partition and x-p-time-partition-limit are control signals; they shouldn’t be stored as event fields. Add them to IGNORE_HEADERS.

-const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY];
+const IGNORE_HEADERS: [&str; 5] = [
+    STREAM_NAME_HEADER_KEY,
+    LOG_SOURCE_KEY,
+    EXTRACT_LOG_KEY,
+    "x-p-time-partition",
+    "x-p-time-partition-limit",
+];

Also applies to: 192-234


294-350: Tests: add coverage for time-partition enforcement and header overrides

Please add tests that:

  • Ingest with x-p-time-partition: _ingested succeeds when all records have _ingested; fails with BAD_REQUEST when any record is missing it.
  • Stream-configured time partition applies when header is absent (fallback path).
  • x-p-time-partition/x-p-time-partition-limit are not inserted into custom fields.

I can draft these if helpful.

src/storage/object_storage.rs (2)

1071-1083: Consider awaiting the spawned task or using a different pattern.

The stats sync spawns a detached task (line 1075) that may outlive the parent operation. If the parent crashes or shuts down, this task might not complete. Consider either awaiting it or using a more robust background task management system.

 async fn handle_stats_sync(stats_calculated: bool) {
     if stats_calculated {
         // perform local sync for the `pstats` dataset
-        task::spawn(async move {
+        let handle = task::spawn(async move {
             if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME)
                 && let Err(err) = stats_stream.flush_and_convert(false, false)
             {
                 error!("Failed in local sync for dataset stats stream: {err}");
             }
         });
+        // Optionally await with a timeout to ensure completion
+        // let _ = tokio::time::timeout(Duration::from_secs(30), handle).await;
     }
 }

103-143: Add Context to Manifest Creation Errors

Errors from catalog::create_from_parquet_file currently bubble up as ObjectStorageError::Invalid without any reference to which file or stream failed. Adding context will make debugging much easier. For example:

• In src/storage/object_storage.rs, add at the top:

use anyhow::Context;

• Around line 133, change:

-    let manifest = catalog::create_from_parquet_file(absolute_path, &path)?;
+    let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)
+        .with_context(|| {
+            format!(
+                "failed to create manifest for parquet file {:?} (stream: {})",
+                filename, stream_name
+            )
+        })?;

This way, if manifest creation fails, the error chain will include exactly which file and stream caused the problem.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 7af280d and 674eed5.

📒 Files selected for processing (4)
  • src/catalog/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (4 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (7 hunks)
  • src/storage/object_storage.rs (5 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/ingest.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
🧬 Code Graph Analysis (3)
src/storage/object_storage.rs (6)
src/parseable/mod.rs (3)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/parseable/streams.rs (1)
  • new (114-131)
src/storage/s3.rs (2)
  • from (850-858)
  • from (862-864)
src/catalog/manifest.rs (1)
  • create_from_parquet_file (91-124)
src/storage/field_stats.rs (1)
  • calculate_field_stats (79-130)
src/catalog/mod.rs (1)
  • update_snapshot (111-128)
src/handlers/http/ingest.rs (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)
  • validate_stream_for_ingestion (271-292)
  • flatten_and_push_logs (51-131)
src/validator.rs (1)
  • stream_name (33-68)
src/handlers/http/modal/utils/ingest_utils.rs (1)
src/utils/json/mod.rs (1)
  • convert_array_to_object (219-256)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (15)
src/handlers/http/ingest.rs (1)

32-32: Good reuse: centralizing ingestion validation is a step in the right direction

Importing and using validate_stream_for_ingestion reduces repeated validation logic across handlers.

src/handlers/http/modal/utils/ingest_utils.rs (4)

26-26: Arc import looks good

Needed for returning Arc<Stream> from the new validator.


42-42: Stream type import is appropriate

Importing Stream for the validator return type is correct.


51-57: API shape is fine, but call sites passing None disable partitioning/validation

Adding time_partition: Option<String> is good. However, all current call sites pass None, which, combined with the change in push_logs (see below), effectively disables time-partitioning and the “every event must contain the field” validation.

Action: ensure handlers compute and pass an effective time_partition (header override → stream default), or restore stream fallback inside push_logs.


67-127: OK to clone the Option for OTEL branches

Cloning Option<String> for per-record pushes is acceptable. Strings are small here; the cost is negligible compared to serialization work.

src/storage/object_storage.rs (5)

75-93: LGTM! Well-structured context management for uploads.

The UploadContext struct and its constructor properly encapsulate stream-related data needed for upload operations. Good design choice to preload the custom partition and schema to avoid repeated locking.


942-966: Good use of bounded concurrency with semaphore.

The implementation properly limits concurrent uploads to 100 using a semaphore, which prevents resource exhaustion. The JoinSet usage for managing tasks is also appropriate.


1054-1069: LGTM! Clean schema processing logic.

The schema file processing properly reads, commits, and cleans up files with appropriate error handling for the cleanup phase.


168-186: Review error-handling strategy for calculate_stats_if_enabled

Currently, any failure inside calculate_stats_if_enabled is traced and results in false, which your upload logic treats as “no stats calculated” rather than an error:

  • In src/storage/object_storage.rs (around line 168), the function always returns false on Err(_).
  • At its call site (around line 175), you do let stats_calculated = calculate_stats_if_enabled(...).await; and include that bool in UploadResult.

If missing stats is acceptable, this silent failure may be fine. If dataset stats are critical to downstream logic, you should surface the error (e.g. change the signature to -> Result<bool, Error>, propagate the error, or otherwise alert the caller).

Please verify whether silent fallback to false aligns with your requirements or if errors should be propagated instead.


1085-1099: Couldn’t find any calls to stream_relative_path (no results from rg -n "stream_relative_path"). It’s unclear what format filename is expected to have—there are no existing .parquet fixtures, no call sites, and no tests to confirm the dot-count logic. Please verify manually that:

  • Filenames always use . as a separator between all partition fields (and only between those fields).
  • The number of dots in filename matches 3 by default (e.g. year.month.day.suffix) and 3 + custom_partition_list.len() when custom partitions are provided.

If you’re not certain of these invariants, add tests (or example fixtures) covering:

  • No custom partitions (3 dots → 3 folders)
  • 1+ custom partitions (e.g. 4 dots → 4 folders)
  • Filenames with extra dots (e.g. in a name component)
src/catalog/mod.rs (5)

111-128: Good optimization with early return for empty changes.

The function properly checks for empty changes and returns early, avoiding unnecessary processing. The batch processing approach is well-structured.


130-158: Excellent use of Rayon for parallel partition grouping.

The parallel implementation using Rayon's into_par_iter() with proper fold and reduce operations is well-designed for performance. The HashMap accumulation pattern is correct.


260-288: Verify meta parameter usage in handle_existing_partition.

The function now correctly receives the meta parameter and uses it for creating manifests. Good fix from the previous review comment.


357-373: LGTM! Clean finalization logic.

The snapshot finalization properly extends the manifest list with new entries and updates statistics before persisting.


375-455: Good handling of first_event_at initialization.

The function properly initializes the first_event_at timestamp when creating the first manifest for a stream, correctly considering both custom and default time partitions.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 18, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/handlers/http/ingest.rs (2)

121-121: Avoid binding unused value; rely on the validator’s side effects

The returned stream is only used by the redundant check below. After removing that block (see next comment), you can avoid the unused binding.

Apply this diff after removing the redundant block:

-    let stream = validate_stream_for_ingestion(&stream_name)?;
+    validate_stream_for_ingestion(&stream_name)?;

400-401: Avoid binding the returned stream value

Same as earlier: once the redundant check below is removed, the binding is unnecessary.

Apply this diff after removing the redundant block:

-    let stream = validate_stream_for_ingestion(&stream_name)?;
+    validate_stream_for_ingestion(&stream_name)?;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 674eed5 and 0d924dd.

📒 Files selected for processing (1)
  • src/handlers/http/ingest.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/ingest.rs
🧬 Code Graph Analysis (1)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (2)
  • validate_stream_for_ingestion (271-292)
  • flatten_and_push_logs (51-131)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
🔇 Additional comments (5)
src/handlers/http/ingest.rs (5)

32-32: Good call: centralizing ingestion validation via validate_stream_for_ingestion

Importing and using a single validator improves consistency across endpoints.


133-133: Passing None for time_partition is correct in OSS

Given OSS gating of time-partitioned ingestion, passing None is expected here.


256-256: OTEL ingestion correctly skips time-partitioning

Intentionally passing None aligns with the product decision that OTEL streams don’t support time partitions.


408-408: Passing None for time_partition is consistent with OSS behavior

Matches the design choice to disable time-partitioned ingestion in OSS.


127-131: Redundant time-partition block here; also yields wrong HTTP status (500) via CustomError

validate_stream_for_ingestion already rejects streams with a time partition. Duplicating the check here is dead code. More importantly, the validator currently returns PostError::CustomError, which maps to 500 (server error) for what is a client-side restriction in OSS. This should be a 400.

Action:

  • Remove this block and rely on the centralized validator.
  • Update validate_stream_for_ingestion to return a 400-producing variant (e.g., PostError::Invalid) with a clear OSS gating message.

Apply this diff to remove the redundant local check:

-    if stream.get_time_partition().is_some() {
-        return Err(PostError::CustomError(
-            "Ingestion is not allowed to stream with time partition".to_string(),
-        ));
-    }

Proposed change in src/handlers/http/modal/utils/ingest_utils.rs (outside this file) to fix the status code and unify the message:

// inside validate_stream_for_ingestion(...)
if stream.get_time_partition().is_some() {
    return Err(PostError::Invalid(anyhow::anyhow!(
        "Ingestion is not allowed for streams with time partition in Parseable OSS. Please upgrade to Parseable Enterprise to enable this feature"
    )));
}
⛔ Skipped due to learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
src/handlers/http/modal/utils/ingest_utils.rs (1)

285-289: Consistency nit: unify OSS gating message for time-partitioned streams

Elsewhere the message was “Ingestion is not allowed to stream with time partition”. Consider using a single consistent message across the codebase to avoid surprises in logs/tests.

-            "Ingestion with time partition is not supported in Parseable OSS"
+            "Ingestion is not allowed to stream with time partition"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0d924dd and 1f0c726.

📒 Files selected for processing (2)
  • src/handlers/http/ingest.rs (4 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/ingest.rs
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/modal/utils/ingest_utils.rs
🧬 Code Graph Analysis (1)
src/handlers/http/modal/utils/ingest_utils.rs (5)
src/otel/logs.rs (1)
  • flatten_otel_logs (202-209)
src/otel/traces.rs (1)
  • flatten_otel_traces (160-167)
src/otel/metrics.rs (1)
  • flatten_otel_metrics (585-596)
src/utils/json/mod.rs (1)
  • convert_array_to_object (219-256)
src/static_schema.rs (1)
  • custom_partition (73-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (3)
src/handlers/http/modal/utils/ingest_utils.rs (3)

56-56: Threading time_partition parameter: good propagation, but confirm intended source-of-truth

Passing time_partition through both flatten_and_push_logs and push_logs looks correct. However, since push_logs no longer derives a fallback from the stream configuration, ensure that this is intentional for enterprise builds where a stream-level default time partition may exist. If a fallback to the stream’s configured time partition is still desired (when header is absent), see my suggestion on Lines 149-156,168.

Also applies to: 138-138


67-75: Kinesis path propagation LGTM

Forwarding the optional time_partition to push_logs for Kinesis is correct and consistent with the other branches.


118-127: Default path propagation LGTM

Passing the optional time_partition through the generic path is consistent.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 18, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
src/handlers/http/ingest.rs (4)

163-163: Clarify OTEL’s relationship to time_partition and avoid tuple-footgun

Returning Option time_partition from setup_otel_stream is fine for Enterprise plumbing, but in OSS and for OTEL ingestion we intentionally ignore it. Consider documenting that explicitly to prevent accidental use in OTEL paths. Also consider a small struct (e.g., SetupOtelResult) over a positional 4-tuple to reduce misuse risk later.

Apply this lightweight doc diff:

 pub async fn setup_otel_stream(
     req: &HttpRequest,
     expected_log_source: LogSource,
     known_fields: &[&str],
     telemetry_type: TelemetryType,
-) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> {
+    // Returns: (stream_name, log_source, log_source_entry, time_partition)
+    // Note: OTEL ingestion paths in OSS intentionally ignore time_partition and always
+    //       pass None downstream. The fourth element exists for Enterprise builds.
+    //
+    // Consider migrating to a named struct to avoid positional tuple confusion.
+    -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> {
@@
-    let mut time_partition = None;
+    // For Enterprise builds, we surface the stream's configured time partition; OTEL
+    // paths in OSS must ignore this (see process_otel_content).
+    let mut time_partition = None;
@@
-        time_partition = stream.get_time_partition();
+        time_partition = stream.get_time_partition();
@@
-    Ok((stream_name, log_source, log_source_entry, time_partition))
+    Ok((stream_name, log_source, log_source_entry, time_partition))

Also applies to: 193-229


247-254: Confirmed: OTEL JSON path correctly forces time_partition=None in OSS

Explicitly passing None ensures time-partition validation is not applied on OTEL ingestion, as intended for OSS.

Add a clarifying comment:

                 flatten_and_push_logs(
                     serde_json::from_slice(&body)?,
                     stream_name,
                     log_source,
                     &p_custom_fields,
-                    None,
+                    None, // OTEL + OSS: time partition not supported
                 )

283-289: Handlers intentionally ignore returned time_partition; that’s fine—document intent

Destructuring (stream_name, log_source, _, _) and ignoring time_partition is consistent with the OTEL restriction. A brief comment will help future readers avoid accidentally plumbing it through OTEL paths in OSS.

Apply this tiny diff in each handler:

-    let (stream_name, log_source, _, _) = setup_otel_stream(
+    // Ignore time_partition in OTEL + OSS
+    let (stream_name, log_source, _, _) = setup_otel_stream(

Also applies to: 303-309, 323-329


396-399: Good: post_event uses the shared validator and enforces OSS gating

validate_stream_for_ingestion centralizes policy; passing None keeps OSS semantics. Consider adding a test mirroring the ingest() test to cover post_event as well (expect HTTP 400 with the standardized OSS message).

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1f0c726 and 5771c7e.

📒 Files selected for processing (1)
  • src/handlers/http/ingest.rs (10 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T18:01:22.804Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:271-292
Timestamp: 2025-08-18T18:01:22.804Z
Learning: In Parseable's ingestion validation, validate_stream_for_ingestion is designed to prevent regular log ingestion endpoints (ingest() and post_event()) from ingesting into streams that exclusively contain OTEL traces or metrics. The function allows mixed streams (regular logs + OTEL) but blocks ingestion into OTEL-only streams, maintaining architectural separation between regular log and OTEL ingestion pathways.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:59:31.600Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:149-156
Timestamp: 2025-08-18T17:59:31.600Z
Learning: The time_partition parameter in push_logs() function in src/handlers/http/modal/utils/ingest_utils.rs is determined by the caller (flatten_and_push_logs). OSS callers pass None, enterprise callers pass the appropriate value (None or Some<>), and OTEL callers always pass None. The push_logs() function should not add additional time partition logic since it's already handled at the caller level.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:58:01.277Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:80-87
Timestamp: 2025-08-18T17:58:01.277Z
Learning: The architectural approach in Parseable for OTEL time partition restrictions is to enforce time_partition = None at the caller level (flatten_and_push_logs) rather than inside push_logs() itself. This allows enterprise versions to reuse push_logs() by passing actual time_partition values while OSS maintains the restriction at the ingestion entry point.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/ingest.rs
🧬 Code Graph Analysis (1)
src/handlers/http/ingest.rs (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)
  • validate_stream_for_ingestion (271-292)
  • flatten_and_push_logs (51-131)
src/validator.rs (1)
  • stream_name (33-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/ingest.rs (1)

32-32: LGTM: centralized validator import is correct

Importing validate_stream_for_ingestion here aligns ingestion checks across endpoints and removes ad-hoc duplication.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/handlers/http/ingest.rs (3)

127-127: Clarify why time_partition is forced to None in OSS

Add a short comment so future changes don’t accidentally plumb time partitions through OSS paths.

Apply this diff:

-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    // OSS: time partition is Enterprise-only; always pass None.
+    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;

247-253: OTEL ingestion correctly passes None for time_partition in OSS

Consistent with the policy that OTEL streams don’t support time partitions in OSS.


398-398: Mirror the OSS None-pass pattern with an explanatory comment

Add the same OSS gating note here for consistency with the /ingest handler.

Apply this diff:

-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    // OSS: time partition is Enterprise-only; always pass None.
+    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
🧹 Nitpick comments (1)
src/handlers/http/ingest.rs (1)

193-222: Return time_partition for Enterprise parity; add a note to avoid confusion in OSS

Returning time_partition here is fine, but OSS OTEL paths intentionally ignore it. A brief comment helps future maintainers.

Apply this diff:

-    let mut time_partition = None;
+    // NOTE: OTEL time partition is Enterprise-only. We return it here for
+    //       Enterprise builds, but OSS paths intentionally ignore it.
+    let mut time_partition = None;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5771c7e and 64eb704.

📒 Files selected for processing (1)
  • src/handlers/http/ingest.rs (10 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T18:01:22.804Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:271-292
Timestamp: 2025-08-18T18:01:22.804Z
Learning: In Parseable's ingestion validation, validate_stream_for_ingestion is designed to prevent regular log ingestion endpoints (ingest() and post_event()) from ingesting into streams that exclusively contain OTEL traces or metrics. The function allows mixed streams (regular logs + OTEL) but blocks ingestion into OTEL-only streams, maintaining architectural separation between regular log and OTEL ingestion pathways.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:59:31.600Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:149-156
Timestamp: 2025-08-18T17:59:31.600Z
Learning: The time_partition parameter in push_logs() function in src/handlers/http/modal/utils/ingest_utils.rs is determined by the caller (flatten_and_push_logs). OSS callers pass None, enterprise callers pass the appropriate value (None or Some<>), and OTEL callers always pass None. The push_logs() function should not add additional time partition logic since it's already handled at the caller level.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:58:01.277Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:80-87
Timestamp: 2025-08-18T17:58:01.277Z
Learning: The architectural approach in Parseable for OTEL time partition restrictions is to enforce time_partition = None at the caller level (flatten_and_push_logs) rather than inside push_logs() itself. This allows enterprise versions to reuse push_logs() by passing actual time_partition values while OSS maintains the restriction at the ingestion entry point.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/ingest.rs
🧬 Code Graph Analysis (1)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (2)
  • validate_stream_for_ingestion (271-292)
  • flatten_and_push_logs (51-131)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (5)
src/handlers/http/ingest.rs (5)

32-32: Good call: centralize ingestion validation via shared helper

Importing validate_stream_for_ingestion consolidates gating (OTEL-only streams + OSS time-partition block) and avoids per-endpoint drift.


121-121: Centralized stream compatibility + OSS feature gating is correctly applied

Calling validate_stream_for_ingestion before ingesting ensures consistent errors and avoids duplication of checks across endpoints.


228-229: LGTM: setup_otel_stream now exposes time_partition

Returning the stream’s time_partition enables Enterprise builds to use it downstream without another fetch.


283-289: Destructuring with .. safely adapts to the new 4-tuple

Ignoring the extra return item keeps code minimal and forward-compatible across editions.

Also applies to: 303-309, 323-329


396-396: Centralized validation also applied to /logstream/{logstream}

Consistent enforcement across both ingest endpoints. Good.

@nitisht nitisht merged commit 79dd900 into parseablehq:main Aug 19, 2025
13 of 15 checks passed
@nikhilsinhaparseable nikhilsinhaparseable deleted the enable-time-partition branch August 19, 2025 07:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Possibility to change the default time partition field to query over historical data
2 participants