-
-
Notifications
You must be signed in to change notification settings - Fork 149
Time partition restrictions #1405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Time partition restrictions #1405
Conversation
WalkthroughBatch-oriented, partition-aware ingestion, JSON flattening, concurrent staging upload, and catalog snapshot updates added per-day partition processing; query API simplified to per-table time-partition inference. Also exposes several handler constants and adds Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HTTP_Ingest
participant IngestUtils
participant JSONUtils
participant Staging
Client->>HTTP_Ingest: POST /ingest (body, headers)
HTTP_Ingest->>IngestUtils: validate_stream_for_ingestion(stream_name)
alt invalid
IngestUtils-->>Client: PostError
else
HTTP_Ingest->>IngestUtils: flatten_and_push_logs(body,..., time_partition?)
IngestUtils->>JSONUtils: convert_array_to_object(body, time_partition, ...)
JSONUtils-->>IngestUtils: Vec<Value> (partitioned batches or single batch)
IngestUtils->>Staging: write staged files
Staging-->>Client: 200 OK
end
sequenceDiagram
participant Staging
participant ObjStorage
participant WorkerPool
participant StorageBackend
participant Catalog
participant Stats
Staging->>ObjStorage: process_parquet_files(stream, files)
ObjStorage->>WorkerPool: spawn upload tasks (bounded semaphore)
WorkerPool->>StorageBackend: upload(file)
WorkerPool->>Catalog: build manifest(file)
WorkerPool->>Stats: calculate dataset stats (optional)
WorkerPool-->>ObjStorage: UploadResult(manifest?, stats_flag)
ObjStorage->>Catalog: update_snapshot(manifests: Vec<File>)
ObjStorage->>Stats: handle_stats_sync(if any)
ObjStorage-->>Staging: cleanup staged files
sequenceDiagram
participant Producer
participant Catalog
participant Partitioner
Producer->>Catalog: update_snapshot(changes: Vec<File>)
Catalog->>Partitioner: group_changes_by_partition (parallel)
Partitioner-->>Catalog: map(partition -> Vec<File>)
loop per partition
Catalog->>Catalog: process_single_partition(files) -> manifest_item?
end
Catalog-->>Producer: Ok
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
💤 Files with no reviewable changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
create stream with custom time partition use header - X-P-Time-Partition: <field-name> - this has to be a timestamp field X-P-Time-Partition-Limit: max historical range default to 30d server validates if all events in the batch has this time partition field if false, it rejects the whole batch
48d93ce
to
f05b382
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (4)
src/handlers/http/ingest.rs (1)
256-270
: OTEL ingestion should also pass effective time_partitionTo uniformly enforce time-partition validation for OTEL, compute the same effective time-partition (header override or stream default) and pass it to
flatten_and_push_logs
.- flatten_and_push_logs( - serde_json::from_slice(&body)?, - stream_name, - log_source, - &p_custom_fields, - None, - ) + let time_partition = req + .headers() + .get("x-p-time-partition") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .or_else(|| PARSEABLE.get_stream(stream_name).ok()?.get_time_partition()); + + flatten_and_push_logs( + serde_json::from_slice(&body)?, + stream_name, + log_source, + &p_custom_fields, + time_partition, + ) .await?;src/handlers/http/modal/utils/ingest_utils.rs (1)
133-139
: Regression: stream-configured time_partition no longer enforced when caller passes None
push_logs
now only uses the passedtime_partition
and no longer derives it from the stream. If call sites forget to pass it (several currently passNone
), time-partition validation silently won’t run. To preserve correctness:
- Compute an effective time partition: per-request override if present, else stream-configured.
- Use that for both
convert_array_to_object
andinto_event
.pub async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, p_custom_fields: &HashMap<String, String>, - time_partition: Option<String>, + time_partition: Option<String>, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition_limit = PARSEABLE .get_stream(stream_name)? .get_time_partition_limit(); let static_schema_flag = stream.get_static_schema_flag(); let custom_partition = stream.get_custom_partition(); let schema_version = stream.get_schema_version(); let p_timestamp = Utc::now(); - let data = convert_array_to_object( + // Prefer per-request override, else fall back to the stream-configured time partition + let effective_time_partition = time_partition.or_else(|| stream.get_time_partition()); + + let data = convert_array_to_object( json, - time_partition.as_ref(), + effective_time_partition.as_ref(), time_partition_limit, custom_partition.as_ref(), schema_version, log_source, )?; for json in data { let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw(); json::Event { json, p_timestamp } .into_event( stream_name.to_owned(), origin_size, &schema, static_schema_flag, custom_partition.as_ref(), - time_partition.as_ref(), + effective_time_partition.as_ref(), schema_version, StreamType::UserDefined, p_custom_fields, )? .process()?; } Ok(()) }Also applies to: 149-156, 168-169
src/catalog/mod.rs (1)
90-109
: Unsafe unwraps in get_file_bounds can crash the node
find(...).unwrap()
andstats.as_ref().unwrap()
will panic if:
- the expected column is missing in a file, or
- stats are absent (e.g., file with schema/metadata drift).
Prefer returning a handled error or skipping such files with a log. Minimal fix:
-fn get_file_bounds( - file: &manifest::File, - partition_column: String, -) -> (DateTime<Utc>, DateTime<Utc>) { - match file - .columns() - .iter() - .find(|col| col.name == partition_column) - .unwrap() - .stats - .as_ref() - .unwrap() - { +fn get_file_bounds( + file: &manifest::File, + partition_column: String, +) -> (DateTime<Utc>, DateTime<Utc>) { + let col = match file.columns().iter().find(|col| col.name == partition_column) { + Some(c) => c, + None => { + error!("Partition column {partition_column} not found in manifest file {}", file.file_path); + // Fallback: treat as empty range to avoid crash; adjust if a different strategy is preferred + return (DateTime::from_timestamp_millis(0).unwrap(), DateTime::from_timestamp_millis(0).unwrap()); + } + }; + let stats = match col.stats.as_ref() { + Some(s) => s, + None => { + error!("Missing stats for column {partition_column} in {}", file.file_path); + return (DateTime::from_timestamp_millis(0).unwrap(), DateTime::from_timestamp_millis(0).unwrap()); + } + }; + match stats { column::TypedStatistics::Int(stats) => ( DateTime::from_timestamp_millis(stats.min).unwrap(), DateTime::from_timestamp_millis(stats.max).unwrap(), ), _ => unreachable!(), } }If a hard error is preferred instead of a fallback, we can return a Result and propagate it.
src/utils/json/mod.rs (1)
195-217
: Document wrapping of array inputs in convert_array_to_objectCall sites and tests confirm that when
body
is a JSON array (and no partitioning), the entire array is returned as a singleVec<Value>
. Please update the docstring onsrc/utils/json/mod.rs
(around theconvert_array_to_object
signature, ~line 219) to explicitly state:
- For array inputs without partitioning, the function wraps the full array into a one-element Vec.
[src/utils/json/mod.rs:219]
🧹 Nitpick comments (14)
src/parseable/streams.rs (1)
1368-1371
: Avoid println in tests; assert update is correct.The updated assertion to expect 3 Arrow files is correct for the same-minute case before compaction. Replace the println with tracing to keep tests clean.
Apply this diff:
- println!("arrow files: {:?}", staging.arrow_files()); + tracing::debug!("arrow files: {:?}", staging.arrow_files());src/parseable/mod.rs (1)
922-931
: Custom partition validation limits to a single key.Matches the documented constraint and simplifies downstream layout. Consider updating the error message to include the provided value for quicker debugging.
Apply this diff:
- return Err(CreateStreamError::Custom { - msg: "Maximum 1 custom partition key is supported".to_string(), + return Err(CreateStreamError::Custom { + msg: format!("Maximum 1 custom partition key is supported (got: {custom_partition})"), status: StatusCode::BAD_REQUEST, });src/storage/field_stats.rs (3)
180-181
: Lowering log level to trace may hide actionable failuresThese failures previously surfaced at
warn!
. Moving totrace!
will make them easy to miss in production. Considerdebug!
with field context (stream/field) so operators can correlate silent drops.- trace!("Failed to execute distinct stats query: {e}"); + debug!("Failed to execute distinct stats query: {e}");
188-189
: Same concern: batch fetch errors downgraded to traceIf the stream is noisy, this will fly under the radar. Suggest
debug!
and includestream_name
/field_name
to aid troubleshooting.- trace!("Failed to fetch batch in distinct stats query: {e}"); + debug!("Failed to fetch batch in distinct stats query: {e}");
216-217
: Field-specific query failure should not be fully silent
trace!
here likely hides intermittent schema/data issues. Recommenddebug!
(or keepwarn!
) to retain visibility.- trace!("Failed to execute distinct stats query for field: {field_name}, error: {e}"); + debug!("Failed to execute distinct stats query for field: {field_name}, error: {e}");src/handlers/http/ingest.rs (1)
515-545
: HTTP status for CustomError
PostError::CustomError
maps to 500. For client-side misuse (e.g., bad header combo), 400 is more appropriate. Consider a dedicated variant (e.g.,TimePartitionIngestionError
) mapped to 400 instead of reusingCustomError
.src/catalog/mod.rs (1)
130-158
: Parallel grouping is fine; consider determinism only if neededUsing Rayon to group by partition is fine. If deterministic processing order ever matters, add a final sort of keys before iteration. Not required right now.
src/storage/object_storage.rs (2)
972-999
: Minor:spawn_parquet_upload_task
doesn’t need to be asyncThis function only schedules a task; making it
fn
avoids an unnecessary.await
.-async fn spawn_parquet_upload_task( +fn spawn_parquet_upload_task( join_set: &mut JoinSet<Result<UploadResult, ObjectStorageError>>, @@ -) { +) { @@ - join_set.spawn(async move { + join_set.spawn(async move { let _permit = semaphore.acquire().await.expect("semaphore is not closed"); upload_single_parquet_file(store, path, stream_relative_path, stream_name, schema).await }); -} +}
1001-1044
: Result collection strategy is fine; consider best-effort behavior on single-file errorsCurrently, the first per-file error aborts the entire sync with an error, potentially leaving other tasks running until JoinSet is dropped. If you prefer best-effort uploads, collect all successes and log failures, then proceed to snapshot update. Optional.
src/utils/json/mod.rs (5)
66-75
: De-duplicate gating logic: reuse this helper in flatten_json_body for consistencyYou’ve introduced should_apply_generic_flattening() but flatten_json_body re-implements the same condition inline. Using the helper there avoids drift and keeps behavior consistent across partitioned and non-partitioned paths.
Outside this hunk, consider updating flatten_json_body as:
let mut nested_value = if should_apply_generic_flattening(&body, schema_version, log_source) { let flattened_json = generic_flattening(&body)?; convert_to_array(flattened_json)? } else { body };
77-115
: Simplify: always iterate the flattened results (remove special-casing len == 1)Both branches call flatten::flatten on each resulting item; the only difference is avoiding a tiny loop for len == 1. Unify to reduce code and potential divergence.
Apply this diff:
fn apply_generic_flattening_for_partition( element: Value, time_partition: Option<&String>, time_partition_limit: Option<NonZeroU32>, custom_partition: Option<&String>, ) -> Result<Vec<Value>, anyhow::Error> { - let flattened_json = generic_flattening(&element)?; - - if flattened_json.len() == 1 { - // Single result - process normally - let mut nested_value = flattened_json.into_iter().next().unwrap(); - flatten::flatten( - &mut nested_value, - "_", - time_partition, - time_partition_limit, - custom_partition, - true, - )?; - Ok(vec![nested_value]) - } else { - // Multiple results - process each individually - let mut result = Vec::new(); - for item in flattened_json { - let mut processed_item = item; - flatten::flatten( - &mut processed_item, - "_", - time_partition, - time_partition_limit, - custom_partition, - true, - )?; - result.push(processed_item); - } - Ok(result) - } + let flattened_json = generic_flattening(&element)?; + let mut result = Vec::with_capacity(flattened_json.len()); + for mut item in flattened_json { + flatten::flatten( + &mut item, + "_", + time_partition, + time_partition_limit, + custom_partition, + true, + )?; + result.push(item); + } + Ok(result) }
149-171
: Pre-allocate results; consider parallelizing for large batchesThe loop is fine. Two small options:
- Pre-allocate: result.reserve(arr.len()) as a lower bound.
- Optional future: use rayon to parallelize per-element processing if CPU-bound and safe (PR mentions rayon was added elsewhere).
Minimal pre-allocation diff:
fn process_partitioned_array( @@ ) -> Result<Vec<Value>, anyhow::Error> { - let mut result = Vec::new(); + let mut result = Vec::new(); + result.reserve(arr.len()); // lower bound; items can expandIf/when you choose to parallelize, you can adopt rayon and then flatten:
// Outside this hunk, add once: use rayon::prelude::*; let chunks: Result<Vec<Vec<Value>>, _> = arr .into_par_iter() .map(|element| { process_partitioned_element( element, time_partition, time_partition_limit, custom_partition, schema_version, log_source, ) }) .collect(); let result = chunks?.into_iter().flatten().collect();
483-515
: Add a negative test: missing time-partition field should reject the whole batchGiven the PR objective (reject batch if any event lacks the partition field), add a test where one element omits source_time and assert that convert_array_to_object returns Err.
Add alongside this test:
#[test] fn test_convert_array_to_object_with_time_partition_missing_field_rejects_batch() { let json = json!([ { "a": "b", "source_time": "2025-08-01T00:00:00.000Z" }, { "a": "b" } // missing required partition field ]); let time_partition = Some("source_time".to_string()); let result = convert_array_to_object( json, time_partition.as_ref(), None, None, SchemaVersion::V0, &crate::event::format::LogSource::default(), ); assert!(result.is_err(), "Batch should be rejected if any event lacks time partition field"); }
517-545
: Consider adding a SchemaVersion::V1 case to exercise generic flattening gatingThis test validates the new non-partitioned batching, but only for V0. A small V1 case with shallow nesting would exercise should_apply_generic_flattening and guard against regressions in the gating condition.
Example to add near this test:
#[test] fn test_non_partitioned_v1_generic_flattening_applies() { // Single object with an array to be expanded by generic_flattening let json = json!({ "id": 1, "items": [ {"x": 10}, {"x": 20} ] }); let result = convert_array_to_object( json.clone(), None, None, None, SchemaVersion::V1, &crate::event::format::LogSource::Json, // matches gating ); assert!(result.is_ok()); let out = result.unwrap(); assert_eq!(out.len(), 1, "Non-partitioned path should still return a single batch item"); // And the single batch item should be an array with expanded objects // depending on flatten_json_body's V1 behavior. // You can assert shape here based on expected expansion semantics. }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (11)
Cargo.toml
(1 hunks)src/catalog/mod.rs
(4 hunks)src/handlers/http/ingest.rs
(3 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(4 hunks)src/handlers/mod.rs
(1 hunks)src/parseable/mod.rs
(2 hunks)src/parseable/staging/writer.rs
(2 hunks)src/parseable/streams.rs
(2 hunks)src/storage/field_stats.rs
(4 hunks)src/storage/object_storage.rs
(5 hunks)src/utils/json/mod.rs
(3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-06-16T09:50:38.636Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1346
File: src/parseable/streams.rs:319-331
Timestamp: 2025-06-16T09:50:38.636Z
Learning: In Parseable's Ingest or Query mode, the node_id is always available because it's generated during server initialization itself, before the get_node_id_string() function in streams.rs would be called. This makes the .expect() calls on QUERIER_META.get() and INGESTOR_META.get() safe in this context.
Applied to files:
src/parseable/streams.rs
🧬 Code Graph Analysis (6)
src/utils/json/mod.rs (4)
src/utils/json/flatten.rs (4)
has_more_than_max_allowed_levels
(335-348)custom_partition
(104-104)generic_flattening
(269-328)flatten
(58-93)src/parseable/mod.rs (3)
custom_partition
(751-751)custom_partition
(923-923)new
(146-158)src/storage/mod.rs (3)
new
(192-194)new
(205-211)default
(215-236)src/event/format/mod.rs (1)
new
(126-131)
src/storage/object_storage.rs (5)
src/parseable/streams.rs (1)
new
(114-131)src/parseable/mod.rs (3)
new
(146-158)serde_json
(300-300)serde_json
(306-306)src/catalog/manifest.rs (1)
create_from_parquet_file
(91-124)src/storage/field_stats.rs (1)
calculate_field_stats
(79-130)src/catalog/mod.rs (1)
update_snapshot
(111-128)
src/handlers/http/modal/utils/ingest_utils.rs (2)
src/utils/json/mod.rs (1)
convert_array_to_object
(219-256)src/static_schema.rs (1)
custom_partition
(73-73)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (1)
flatten_and_push_logs
(51-131)
src/parseable/mod.rs (1)
src/static_schema.rs (1)
custom_partition
(73-73)
src/catalog/mod.rs (3)
src/storage/object_storage.rs (2)
new
(83-92)manifest_path
(1216-1236)src/stats.rs (4)
event_labels
(218-220)event_labels_date
(226-232)storage_size_labels_date
(234-236)get_current_stats
(51-109)src/catalog/manifest.rs (1)
default
(69-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (16)
Cargo.toml (1)
126-126
: Rayon dependency addition looks good.No immediate compatibility or security concerns. Keep an eye on binary size and parallelism defaults where you adopt Rayon to avoid oversubscription (Tokio + Rayon).
If you intend to gate Rayon-backed paths, consider making it an optional feature to avoid pulling it into minimal deployments. Want me to draft a feature-gated dependency stanza?
src/parseable/streams.rs (1)
276-276
: ULID-based grouping token: sensible choice for collision-free parquet names.Using a per-run ULID avoids cross-run collisions and keeps grouping deterministic for a single pass.
src/parseable/mod.rs (4)
538-544
: Mutual exclusivity between time and custom partitions: good guardrail.This aligns with the new semantics and avoids ambiguous layouts. Keep it.
Confirm that the ingestion paths reject payloads that contain both fields when stream metadata already sets one (including updates). If needed, I can scan the codebase and propose harmonized checks at ingestion boundaries.
871-900
: Static schema validation path is correct.Passing time_partition and custom_partition through to schema conversion ensures early validation of field presence for static schema streams.
902-920
: Time-partition limit parsing covers format and zero checks.Error messages are clear and actionable. No further changes needed.
531-576
: Double-check end-to-end enforcement that every event carries the specified time-partition field.The PR states batches should be rejected if any event lacks the specified time-partition field. This file handles metadata and creation, but not the ingest-time validation.
Would you like me to verify and, if needed, add a guard in the ingest pipeline (e.g., in flatten_and_push_logs/push_logs) to enforce this invariant at batch boundaries?
src/handlers/mod.rs (1)
29-35
: Making header constants public is appropriate.This enables cross-module access to time-partition and related headers without duplication. Keep the constants centralized here.
src/storage/field_stats.rs (1)
126-127
: Signature sync with new time_partition param looks correctPassing
None
intoflatten_and_push_logs
aligns with the updated signature and preserves the previous behavior for dataset-stats.src/handlers/http/ingest.rs (3)
118-133
: Good: explicit stream retrieval and error propagationMatching on
PARSEABLE.get_stream
and propagating failures removes the prior ambiguity. The OTEL format compatibility check is preserved.
410-425
: Same issue in post_event: hard block + not passing time_partitionMirror the fix applied in
ingest
: remove the block and pass effective time partition toflatten_and_push_logs
.- if stream.get_time_partition().is_some() { - return Err(PostError::CustomError( - "Ingestion is not allowed to stream with time partition".to_string(), - )); - } - - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + let time_partition = req + .headers() + .get("x-p-time-partition") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .or_else(|| stream.get_time_partition()); + + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + time_partition, + ) + .await?;⛔ Skipped due to learnings
Learnt from: nikhilsinhaparseable PR: parseablehq/parseable#1263 File: src/handlers/http/ingest.rs:300-310 Timestamp: 2025-03-26T06:44:53.362Z Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Learnt from: de-sh PR: parseablehq/parseable#1185 File: src/handlers/http/logstream.rs:255-261 Timestamp: 2025-02-14T09:49:25.818Z Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
55-67
: Time-partition headers are already handled at stream creation
The ingest handler pulls the stream’stime_partition
andtime_partition_limit
from its metadata (viaPARSEABLE.get_stream…get_time_partition_limit()
) and passes them intoflatten_and_push_logs
for validation. The rawX-P-Time-Partition
andX-P-Time-Partition-Limit
headers are parsed when the stream is created or updated in
- src/handlers/http/modal/utils/logstream_utils.rs (see
TIME_PARTITION_KEY
/TIME_PARTITION_LIMIT_KEY
inPutStreamHeaders
)No changes to the ingest endpoint are required.
src/handlers/http/modal/utils/ingest_utils.rs (1)
51-57
: Public API extension is sensibleAdding
time_partition: Option<String>
toflatten_and_push_logs
keeps backward compatibility and enables per-request overrides.src/catalog/mod.rs (1)
111-128
: Batch-oriented snapshot update: good structure and early returnSwitching to a Vec of changes and early exit is clean and unlocks per-partition processing.
src/storage/object_storage.rs (1)
75-93
: UploadContext is a good encapsulationThe context wrapper simplifies passing stream-derived info across the pipeline.
src/utils/json/mod.rs (2)
126-145
: Per-element generic-flatten gating may yield mixed shapes within a single batchIn partitioned mode, generic_flattening is decided per element, whereas the non-partitioned path gates on the whole body. This can produce heterogeneous outputs (some events expanded via generic flattening while others aren’t) within the same batch if depth varies across elements. If downstream expects a uniform schema per batch, this can be surprising.
Please confirm that this behavioral change is intentional and acceptable for consumers. If uniformity is required, consider evaluating the gating once for the entire array and applying the same policy to all elements in that batch.
219-256
: Routing logic LGTMThe partition/non-partition routing is clear, and validation is always enabled in the partitioned path. This aligns with rejecting the entire batch when a single event violates partition requirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
src/query/mod.rs (3)
582-615
: Always enforce server time bounds per table; don’t skip just because “some” time filter existsRight now, if any time filter exists on the scan, no server-side time window is added. This can miss adding the missing bound (e.g., only >= start is present, but < end is not), causing larger-than-necessary scans. It’s safe and simpler to always apply the request’s [start, end) bounds; redundant predicates are benign and DF can still optimize.
Apply this diff to always add the filters and avoid the “any-time-filter” gate:
- let mut new_filters = vec![]; - if !table_contains_any_time_filters(&table, time_partition.as_ref()) { - let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string(); - let time_column = time_partition.as_ref().unwrap_or(&default_timestamp); - - // Create time filters with table-qualified column names - let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( - start_time, - )) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_column.clone(), - ))); - - let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded( - end_time, - )) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_column.clone(), - ))); - - new_filters.push(start_time_filter); - new_filters.push(end_time_filter); - } + // Always add the server-side time window to constrain scans. + let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string(); + let time_column = time_partition.as_ref().unwrap_or(&default_timestamp); + + let mut new_filters = vec![]; + // Create time filters with table-qualified column names + let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr(Expr::Column(Column::new( + Some(table.table_name.to_owned()), + time_column.clone(), + ))); + + let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr(Expr::Column(Column::new( + Some(table.table_name.to_owned()), + time_column.clone(), + ))); + + new_filters.push(start_time_filter); + new_filters.push(end_time_filter);
619-623
: Avoid unwrap on Filter::try_new to prevent panics; fall back gracefullyIf a stream is misconfigured or the time column is absent,
Filter::try_new
can error. Panicking here crashes query execution. Return the scan unchanged on error.- let filter = - Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))) - .unwrap(); - Ok(Transformed::yes(LogicalPlan::Filter(filter))) + match Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))) { + Ok(filter) => Ok(Transformed::yes(LogicalPlan::Filter(filter))), + Err(_) => Ok(Transformed::no(LogicalPlan::TableScan(table))), + }
641-657
: Detect time filters more robustly (check both sides and match the intended column)Current check only inspects the left-hand side and compares just the column name. Improve robustness by checking both sides and still matching the time column name.
- table - .filters - .iter() - .filter_map(|x| { - if let Expr::BinaryExpr(binexpr) = x { - Some(binexpr) - } else { - None - } - }) - .any(|expr| { - matches!(&*expr.left, Expr::Column(Column { name, .. }) - if name == time_column) - }) + table + .filters + .iter() + .filter_map(|x| { + if let Expr::BinaryExpr(binexpr) = x { + Some(binexpr) + } else { + None + } + }) + .any(|expr| { + match (&*expr.left, &*expr.right) { + (Expr::Column(Column { name, .. }), _) if name == time_column => true, + (_, Expr::Column(Column { name, .. })) if name == time_column => true, + _ => false, + } + })src/handlers/airplane.rs (1)
138-154
: Ensure streams are registered in Query/Prism modes before executingAirplane path resolves table names but doesn’t create/attach streams in-memory like the HTTP path does. Given GlobalSchemaProvider.table() expects streams to exist, consider registering them (especially in Query/Prism) to avoid “stream not found” errors.
Proposed addition (outside the changed hunk) before building the query:
use crate::handlers::http::query::create_streams_for_distributed; // After `let streams = resolve_stream_names(&ticket.query)?;` if PARSEABLE.options.mode == crate::option::Mode::Query || PARSEABLE.options.mode == crate::option::Mode::Prism { if let Err(e) = create_streams_for_distributed(streams.clone()).await { return Err(Status::failed_precondition(format!("Failed to prepare streams: {e}"))); } }This mirrors the pattern used in the HTTP handler and aligns with prior guidance to verify stream existence in both query and standalone modes.
src/handlers/http/query.rs (3)
207-216
: Avoid potential panic on empty table list when labeling metricsIndexing with [0] will panic if tables becomes empty upstream (e.g., an edge-case query with no table scan). Prefer safe access with a descriptive error.
-async fn handle_non_streaming_query( - query: LogicalQuery, - table_name: Vec<String>, +async fn handle_non_streaming_query( + query: LogicalQuery, + table_name: Vec<String>, query_request: &Query, time: Instant, ) -> Result<HttpResponse, QueryError> { - let first_table_name = table_name[0].clone(); + let first_table_name = table_name + .get(0) + .cloned() + .ok_or(QueryError::MalformedQuery("No table name found in query"))?; let (records, fields) = execute(query, query_request.streaming).await?; @@ - QUERY_EXECUTE_TIME - .with_label_values(&[&first_table_name]) - .observe(time); + QUERY_EXECUTE_TIME + .with_label_values(&[&first_table_name]) + .observe(time);Also applies to: 228-230
260-266
: Same here: safe first-table extraction for streaming pathMirror the non-streaming fix to avoid panics and keep labeling robust.
-async fn handle_streaming_query( - query: LogicalQuery, - table_name: Vec<String>, +async fn handle_streaming_query( + query: LogicalQuery, + table_name: Vec<String>, query_request: &Query, time: Instant, ) -> Result<HttpResponse, QueryError> { - let first_table_name = table_name[0].clone(); + let first_table_name = table_name + .get(0) + .cloned() + .ok_or(QueryError::MalformedQuery("No table name found in query"))?; let (records_stream, fields) = execute(query, query_request.streaming).await?; @@ - QUERY_EXECUTE_TIME - .with_label_values(&[&first_table_name]) - .observe(time); + QUERY_EXECUTE_TIME + .with_label_values(&[&first_table_name]) + .observe(time);Also applies to: 276-278
201-206
: Docs drift: parameter now carries multiple tablesThe doc comments still describe a single table_name, but the functions accept Vec. Update the docs to prevent confusion.
Also applies to: 250-258
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
src/alerts/alerts_utils.rs
(1 hunks)src/handlers/airplane.rs
(1 hunks)src/handlers/http/query.rs
(6 hunks)src/query/mod.rs
(7 hunks)src/query/stream_schema_provider.rs
(0 hunks)
💤 Files with no reviewable changes (1)
- src/query/stream_schema_provider.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/query/mod.rs
🧬 Code Graph Analysis (4)
src/alerts/alerts_utils.rs (1)
src/query/mod.rs (2)
execute
(78-92)execute
(179-212)
src/handlers/airplane.rs (1)
src/query/mod.rs (2)
execute
(78-92)execute
(179-212)
src/query/mod.rs (4)
src/query/stream_schema_provider.rs (1)
table
(87-100)src/parseable/streams.rs (1)
new
(114-131)src/parseable/mod.rs (1)
new
(146-158)src/metadata.rs (1)
new
(97-134)
src/handlers/http/query.rs (2)
src/query/mod.rs (2)
execute
(78-92)execute
(179-212)src/query/stream_schema_provider.rs (1)
table
(87-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (4)
src/handlers/airplane.rs (1)
208-210
: Signature update LGTMSwapping to execute(query, false) matches the new API and keeps the downstream logic intact.
src/alerts/alerts_utils.rs (1)
110-113
: Signature update LGTMThe switch to execute(query, false) aligns with the refactored public API. Upstream stream preparation is already handled earlier in this function.
src/handlers/http/query.rs (2)
97-99
: Signature update LGTMThe change to execute(query, false) matches the new query API and the surrounding logic remains consistent.
128-132
: Count path: safe first-table extraction LGTMYou guard the empty case with a proper error; extracting the first table name for counts is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)
138-170
: Regression: stream-configured time partition no longer takes effect; restore fallback and minor perf cleanups
push_logs
used to derivetime_partition
from the stream. After this change, it only uses the passed-in value; with call sites passingNone
, time partition is silently ignored and validation never runs.Fix:
- Compute
effective_time_partition = time_partition.or_else(|| stream.get_time_partition())
.- Use
stream
fortime_partition_limit
andschema
to avoid repeated lookups.pub async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, p_custom_fields: &HashMap<String, String>, - time_partition: Option<String>, + time_partition: Option<String>, ) -> Result<(), PostError> { - let stream = PARSEABLE.get_stream(stream_name)?; - let time_partition_limit = PARSEABLE - .get_stream(stream_name)? - .get_time_partition_limit(); + let stream = PARSEABLE.get_stream(stream_name)?; + let time_partition_limit = stream.get_time_partition_limit(); let static_schema_flag = stream.get_static_schema_flag(); let custom_partition = stream.get_custom_partition(); let schema_version = stream.get_schema_version(); let p_timestamp = Utc::now(); - let data = convert_array_to_object( - json, - time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), - schema_version, - log_source, - )?; + // Fallback to stream-configured time partition if caller did not supply one + let effective_time_partition = time_partition.or_else(|| stream.get_time_partition()); + + let data = convert_array_to_object( + json, + effective_time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), + schema_version, + log_source, + )?; + let schema = stream.get_schema_raw(); for json in data { let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length - let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw(); json::Event { json, p_timestamp } .into_event( stream_name.to_owned(), origin_size, &schema, static_schema_flag, custom_partition.as_ref(), - time_partition.as_ref(), + effective_time_partition.as_ref(), schema_version, StreamType::UserDefined, p_custom_fields, )? .process()?; } Ok(()) }
51-57
: Action Required: Wiretime_partition
Across Allflatten_and_push_logs
Call-SitesThe new signature for
flatten_and_push_logs
takes five parameters—includingp_custom_fields
and an explicittime_partition
—so we must ensure no call-site still defaults toNone
or omits arguments. Update the following locations:• src/handlers/http/ingest.rs
- Line 133 and Line 408: currently calling
flatten_and_push_logs(..., &p_custom_fields, None).await?;
→ ReplaceNone
with the actualtime_partition
value you derive.• src/handlers/http/modal/ingest/ingestor_ingest.rs
- Line 41: calling with only three arguments (
req, body, stream_name
)
→ Expand to five arguments:
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, time_partition).await?;
• src/storage/field_stats.rs
- Line 121: verify that you’re passing both a
&HashMap<String, String>
and the correcttime_partition
here—instead of relying on defaults.The internal
push_logs
calls insrc/handlers/http/modal/utils/ingest_utils.rs
remain unaffected, as they do not accept atime_partition
.
♻️ Duplicate comments (4)
src/handlers/http/ingest.rs (2)
127-134
: Remove hard block and plumb an effective time_partition (header override → stream default) into flattening/push pathBlocking ingestion when a stream has a time partition contradicts the PR objective (enable custom time partitions) and disables validation of the required field at the right layer. Let
flatten_and_push_logs
/push_logs
enforce “field present in every event” by providing the effectivetime_partition
.Also note: returning
PostError::CustomError
maps to 500; this is a client-side error and should be 400. Prefer a typed error variant that maps to BAD_REQUEST.Apply the following changes:
- Remove the hard block.
- Compute
time_partition
from headerx-p-time-partition
with fallback tostream.get_time_partition()
.- Pass it to
flatten_and_push_logs
.- if stream.get_time_partition().is_some() { - return Err(PostError::CustomError( - "Ingestion is not allowed to stream with time partition".to_string(), - )); - } - - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + // Prefer header override, else fall back to stream-configured time partition + let time_partition = req + .headers() + .get("x-p-time-partition") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .or_else(|| stream.get_time_partition()); + + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + time_partition, + ) + .await?;
400-409
: Same issue as ingest(): remove hard block and pass effective time_partitionThis block prevents the feature altogether on this endpoint and returns a 500 via CustomError. Remove the guard, derive the effective time partition from header → stream default, and pass it to
flatten_and_push_logs
.- let stream = validate_stream_for_ingestion(&stream_name)?; - - if stream.get_time_partition().is_some() { - return Err(PostError::CustomError( - "Ingestion is not allowed to stream with time partition".to_string(), - )); - } - - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + let stream = validate_stream_for_ingestion(&stream_name)?; + + // Prefer header override, else fall back to stream-configured time partition + let time_partition = req + .headers() + .get("x-p-time-partition") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .or_else(|| stream.get_time_partition()); + + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + time_partition, + ) + .await?;src/storage/object_storage.rs (1)
145-166
: Fix brittle date extraction that can panic.The current implementation uses unchecked array indexing which will panic if the filename format deviates from expectations. This is a known issue from previous reviews.
Apply this diff to handle malformed filenames gracefully:
-fn update_storage_metrics( - path: &std::path::Path, - stream_name: &str, - filename: &str, -) -> Result<(), ObjectStorageError> { - let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0]; - file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1]; +fn update_storage_metrics( + path: &std::path::Path, + stream_name: &str, + filename: &str, +) -> Result<(), ObjectStorageError> { + // Extract date from filename format: "date=YYYY-MM-DD.suffix" + let file_date_part = filename + .split('.') + .next() + .and_then(|part| part.split('=').nth(1)) + .ok_or_else(|| { + ObjectStorageError::Custom(format!("Invalid filename format: {}", filename)) + })?;src/catalog/mod.rs (1)
341-354
: Wrong meta used when creating a manifest for non-matching partition.When the manifest path doesn't match the current node's pattern (line 341-354), the code uses
ObjectStoreFormat::default()
instead of the actual stream metadata. This loses stream-specific settings like time_partition.Apply this diff to use the correct metadata:
} else { // Create new manifest for different partition create_manifest( partition_lower, partition_changes, storage, stream_name, false, - ObjectStoreFormat::default(), + meta.clone(), events_ingested, ingestion_size, storage_size, ) .await }
🧹 Nitpick comments (7)
src/handlers/http/ingest.rs (3)
121-126
: Avoid discarding the returned Stream; reuse it to derive effective time partitionYou fetch
stream
viavalidate_stream_for_ingestion
but don’t use it afterward. It’s useful to:
- Avoid an extra lookup later.
- Compute the effective time partition (header override or stream default) once and pass it downstream.
This also helps eliminate subsequent duplicate checks.
251-275
: OTEL endpoints: align validation/gating strategy across all ingestion pathsCurrently:
- ingest()/post_event block time-partitioned streams via validation helper + explicit guard.
- OTEL endpoints don’t apply the same gating and pass
None
for partition.Decide one consistent policy:
- If the feature is enabled (per this PR), remove blocks and rely on downstream validation once time_partition is provided.
- If it’s gated (enterprise-only), enforce the same check in OTEL paths too (ideally centralized), and return a 4xx.
I can draft a follow-up to centralize this check.
Also applies to: 287-297, 315-337
52-92
: Missing support for X-P-Time-Partition-Limit headerPR description includes
X-P-Time-Partition-Limit
, but no code reads or forwards it;push_logs
always uses the stream-configured limit. If per-request override is desired:
- Parse the header here.
- Plumb an optional limit through to
push_logs
and down toconvert_array_to_object
.I can provide a patch that threads
Option<NonZeroU32>
through the APIs with minimal churn.src/handlers/http/modal/utils/ingest_utils.rs (2)
47-49
: Exclude control headers from custom fields to avoid polluting the datasetHeaders like
x-p-time-partition
andx-p-time-partition-limit
are control signals; they shouldn’t be stored as event fields. Add them toIGNORE_HEADERS
.-const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; +const IGNORE_HEADERS: [&str; 5] = [ + STREAM_NAME_HEADER_KEY, + LOG_SOURCE_KEY, + EXTRACT_LOG_KEY, + "x-p-time-partition", + "x-p-time-partition-limit", +];Also applies to: 192-234
294-350
: Tests: add coverage for time-partition enforcement and header overridesPlease add tests that:
- Ingest with
x-p-time-partition: _ingested
succeeds when all records have_ingested
; fails with BAD_REQUEST when any record is missing it.- Stream-configured time partition applies when header is absent (fallback path).
x-p-time-partition
/x-p-time-partition-limit
are not inserted into custom fields.I can draft these if helpful.
src/storage/object_storage.rs (2)
1071-1083
: Consider awaiting the spawned task or using a different pattern.The stats sync spawns a detached task (line 1075) that may outlive the parent operation. If the parent crashes or shuts down, this task might not complete. Consider either awaiting it or using a more robust background task management system.
async fn handle_stats_sync(stats_calculated: bool) { if stats_calculated { // perform local sync for the `pstats` dataset - task::spawn(async move { + let handle = task::spawn(async move { if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) && let Err(err) = stats_stream.flush_and_convert(false, false) { error!("Failed in local sync for dataset stats stream: {err}"); } }); + // Optionally await with a timeout to ensure completion + // let _ = tokio::time::timeout(Duration::from_secs(30), handle).await; } }
103-143
: Add Context to Manifest Creation ErrorsErrors from
catalog::create_from_parquet_file
currently bubble up asObjectStorageError::Invalid
without any reference to which file or stream failed. Adding context will make debugging much easier. For example:• In
src/storage/object_storage.rs
, add at the top:use anyhow::Context;• Around line 133, change:
- let manifest = catalog::create_from_parquet_file(absolute_path, &path)?; + let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path) + .with_context(|| { + format!( + "failed to create manifest for parquet file {:?} (stream: {})", + filename, stream_name + ) + })?;This way, if manifest creation fails, the error chain will include exactly which file and stream caused the problem.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/catalog/mod.rs
(4 hunks)src/handlers/http/ingest.rs
(4 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(7 hunks)src/storage/object_storage.rs
(5 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/ingest.rs
src/handlers/http/modal/utils/ingest_utils.rs
🧬 Code Graph Analysis (3)
src/storage/object_storage.rs (6)
src/parseable/mod.rs (3)
new
(146-158)serde_json
(300-300)serde_json
(306-306)src/parseable/streams.rs (1)
new
(114-131)src/storage/s3.rs (2)
from
(850-858)from
(862-864)src/catalog/manifest.rs (1)
create_from_parquet_file
(91-124)src/storage/field_stats.rs (1)
calculate_field_stats
(79-130)src/catalog/mod.rs (1)
update_snapshot
(111-128)
src/handlers/http/ingest.rs (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)
validate_stream_for_ingestion
(271-292)flatten_and_push_logs
(51-131)src/validator.rs (1)
stream_name
(33-68)
src/handlers/http/modal/utils/ingest_utils.rs (1)
src/utils/json/mod.rs (1)
convert_array_to_object
(219-256)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (15)
src/handlers/http/ingest.rs (1)
32-32
: Good reuse: centralizing ingestion validation is a step in the right directionImporting and using
validate_stream_for_ingestion
reduces repeated validation logic across handlers.src/handlers/http/modal/utils/ingest_utils.rs (4)
26-26
: Arc import looks goodNeeded for returning
Arc<Stream>
from the new validator.
42-42
: Stream type import is appropriateImporting
Stream
for the validator return type is correct.
51-57
: API shape is fine, but call sites passing None disable partitioning/validationAdding
time_partition: Option<String>
is good. However, all current call sites passNone
, which, combined with the change inpush_logs
(see below), effectively disables time-partitioning and the “every event must contain the field” validation.Action: ensure handlers compute and pass an effective time_partition (header override → stream default), or restore stream fallback inside
push_logs
.
67-127
: OK to clone the Option for OTEL branchesCloning
Option<String>
for per-record pushes is acceptable. Strings are small here; the cost is negligible compared to serialization work.src/storage/object_storage.rs (5)
75-93
: LGTM! Well-structured context management for uploads.The
UploadContext
struct and its constructor properly encapsulate stream-related data needed for upload operations. Good design choice to preload the custom partition and schema to avoid repeated locking.
942-966
: Good use of bounded concurrency with semaphore.The implementation properly limits concurrent uploads to 100 using a semaphore, which prevents resource exhaustion. The JoinSet usage for managing tasks is also appropriate.
1054-1069
: LGTM! Clean schema processing logic.The schema file processing properly reads, commits, and cleans up files with appropriate error handling for the cleanup phase.
168-186
: Review error-handling strategy forcalculate_stats_if_enabled
Currently, any failure inside
calculate_stats_if_enabled
is traced and results infalse
, which your upload logic treats as “no stats calculated” rather than an error:
- In
src/storage/object_storage.rs
(around line 168), the function always returnsfalse
onErr(_)
.- At its call site (around line 175), you do
let stats_calculated = calculate_stats_if_enabled(...).await;
and include that bool inUploadResult
.If missing stats is acceptable, this silent failure may be fine. If dataset stats are critical to downstream logic, you should surface the error (e.g. change the signature to
-> Result<bool, Error>
, propagate the error, or otherwise alert the caller).Please verify whether silent fallback to
false
aligns with your requirements or if errors should be propagated instead.
1085-1099
: Couldn’t find any calls tostream_relative_path
(no results fromrg -n "stream_relative_path"
). It’s unclear what formatfilename
is expected to have—there are no existing.parquet
fixtures, no call sites, and no tests to confirm the dot-count logic. Please verify manually that:
- Filenames always use
.
as a separator between all partition fields (and only between those fields).- The number of dots in
filename
matches 3 by default (e.g.year.month.day.suffix
) and 3 +custom_partition_list.len()
when custom partitions are provided.If you’re not certain of these invariants, add tests (or example fixtures) covering:
- No custom partitions (3 dots → 3 folders)
- 1+ custom partitions (e.g. 4 dots → 4 folders)
- Filenames with extra dots (e.g. in a name component)
src/catalog/mod.rs (5)
111-128
: Good optimization with early return for empty changes.The function properly checks for empty changes and returns early, avoiding unnecessary processing. The batch processing approach is well-structured.
130-158
: Excellent use of Rayon for parallel partition grouping.The parallel implementation using Rayon's
into_par_iter()
with proper fold and reduce operations is well-designed for performance. The HashMap accumulation pattern is correct.
260-288
: Verify meta parameter usage in handle_existing_partition.The function now correctly receives the meta parameter and uses it for creating manifests. Good fix from the previous review comment.
357-373
: LGTM! Clean finalization logic.The snapshot finalization properly extends the manifest list with new entries and updates statistics before persisting.
375-455
: Good handling of first_event_at initialization.The function properly initializes the first_event_at timestamp when creating the first manifest for a stream, correctly considering both custom and default time partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/handlers/http/ingest.rs (2)
121-121
: Avoid binding unused value; rely on the validator’s side effectsThe returned stream is only used by the redundant check below. After removing that block (see next comment), you can avoid the unused binding.
Apply this diff after removing the redundant block:
- let stream = validate_stream_for_ingestion(&stream_name)?; + validate_stream_for_ingestion(&stream_name)?;
400-401
: Avoid binding the returned stream valueSame as earlier: once the redundant check below is removed, the binding is unnecessary.
Apply this diff after removing the redundant block:
- let stream = validate_stream_for_ingestion(&stream_name)?; + validate_stream_for_ingestion(&stream_name)?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/handlers/http/ingest.rs
(4 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Applied to files:
src/handlers/http/ingest.rs
🧬 Code Graph Analysis (1)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (2)
validate_stream_for_ingestion
(271-292)flatten_and_push_logs
(51-131)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
🔇 Additional comments (5)
src/handlers/http/ingest.rs (5)
32-32
: Good call: centralizing ingestion validation via validate_stream_for_ingestionImporting and using a single validator improves consistency across endpoints.
133-133
: Passing None for time_partition is correct in OSSGiven OSS gating of time-partitioned ingestion, passing None is expected here.
256-256
: OTEL ingestion correctly skips time-partitioningIntentionally passing None aligns with the product decision that OTEL streams don’t support time partitions.
408-408
: Passing None for time_partition is consistent with OSS behaviorMatches the design choice to disable time-partitioned ingestion in OSS.
127-131
: Redundant time-partition block here; also yields wrong HTTP status (500) via CustomErrorvalidate_stream_for_ingestion already rejects streams with a time partition. Duplicating the check here is dead code. More importantly, the validator currently returns PostError::CustomError, which maps to 500 (server error) for what is a client-side restriction in OSS. This should be a 400.
Action:
- Remove this block and rely on the centralized validator.
- Update validate_stream_for_ingestion to return a 400-producing variant (e.g., PostError::Invalid) with a clear OSS gating message.
Apply this diff to remove the redundant local check:
- if stream.get_time_partition().is_some() { - return Err(PostError::CustomError( - "Ingestion is not allowed to stream with time partition".to_string(), - )); - }Proposed change in src/handlers/http/modal/utils/ingest_utils.rs (outside this file) to fix the status code and unify the message:
// inside validate_stream_for_ingestion(...) if stream.get_time_partition().is_some() { return Err(PostError::Invalid(anyhow::anyhow!( "Ingestion is not allowed for streams with time partition in Parseable OSS. Please upgrade to Parseable Enterprise to enable this feature" ))); }⛔ Skipped due to learnings
Learnt from: nikhilsinhaparseable PR: parseablehq/parseable#1405 File: src/handlers/http/ingest.rs:127-134 Timestamp: 2025-08-18T12:30:23.342Z Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Learnt from: nikhilsinhaparseable PR: parseablehq/parseable#1405 File: src/parseable/mod.rs:528-533 Timestamp: 2025-08-18T12:37:47.703Z Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/handlers/http/modal/utils/ingest_utils.rs (1)
285-289
: Consistency nit: unify OSS gating message for time-partitioned streamsElsewhere the message was “Ingestion is not allowed to stream with time partition”. Consider using a single consistent message across the codebase to avoid surprises in logs/tests.
- "Ingestion with time partition is not supported in Parseable OSS" + "Ingestion is not allowed to stream with time partition"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/handlers/http/ingest.rs
(4 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/handlers/http/ingest.rs
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.
Applied to files:
src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.
Applied to files:
src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Applied to files:
src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/modal/utils/ingest_utils.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Applied to files:
src/handlers/http/modal/utils/ingest_utils.rs
🧬 Code Graph Analysis (1)
src/handlers/http/modal/utils/ingest_utils.rs (5)
src/otel/logs.rs (1)
flatten_otel_logs
(202-209)src/otel/traces.rs (1)
flatten_otel_traces
(160-167)src/otel/metrics.rs (1)
flatten_otel_metrics
(585-596)src/utils/json/mod.rs (1)
convert_array_to_object
(219-256)src/static_schema.rs (1)
custom_partition
(73-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (3)
src/handlers/http/modal/utils/ingest_utils.rs (3)
56-56
: Threading time_partition parameter: good propagation, but confirm intended source-of-truthPassing time_partition through both flatten_and_push_logs and push_logs looks correct. However, since push_logs no longer derives a fallback from the stream configuration, ensure that this is intentional for enterprise builds where a stream-level default time partition may exist. If a fallback to the stream’s configured time partition is still desired (when header is absent), see my suggestion on Lines 149-156,168.
Also applies to: 138-138
67-75
: Kinesis path propagation LGTMForwarding the optional time_partition to push_logs for Kinesis is correct and consistent with the other branches.
118-127
: Default path propagation LGTMPassing the optional time_partition through the generic path is consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
src/handlers/http/ingest.rs (4)
163-163
: Clarify OTEL’s relationship to time_partition and avoid tuple-footgunReturning Option time_partition from setup_otel_stream is fine for Enterprise plumbing, but in OSS and for OTEL ingestion we intentionally ignore it. Consider documenting that explicitly to prevent accidental use in OTEL paths. Also consider a small struct (e.g., SetupOtelResult) over a positional 4-tuple to reduce misuse risk later.
Apply this lightweight doc diff:
pub async fn setup_otel_stream( req: &HttpRequest, expected_log_source: LogSource, known_fields: &[&str], telemetry_type: TelemetryType, -) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> { + // Returns: (stream_name, log_source, log_source_entry, time_partition) + // Note: OTEL ingestion paths in OSS intentionally ignore time_partition and always + // pass None downstream. The fourth element exists for Enterprise builds. + // + // Consider migrating to a named struct to avoid positional tuple confusion. + -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> { @@ - let mut time_partition = None; + // For Enterprise builds, we surface the stream's configured time partition; OTEL + // paths in OSS must ignore this (see process_otel_content). + let mut time_partition = None; @@ - time_partition = stream.get_time_partition(); + time_partition = stream.get_time_partition(); @@ - Ok((stream_name, log_source, log_source_entry, time_partition)) + Ok((stream_name, log_source, log_source_entry, time_partition))Also applies to: 193-229
247-254
: Confirmed: OTEL JSON path correctly forces time_partition=None in OSSExplicitly passing None ensures time-partition validation is not applied on OTEL ingestion, as intended for OSS.
Add a clarifying comment:
flatten_and_push_logs( serde_json::from_slice(&body)?, stream_name, log_source, &p_custom_fields, - None, + None, // OTEL + OSS: time partition not supported )
283-289
: Handlers intentionally ignore returned time_partition; that’s fine—document intentDestructuring (stream_name, log_source, _, _) and ignoring time_partition is consistent with the OTEL restriction. A brief comment will help future readers avoid accidentally plumbing it through OTEL paths in OSS.
Apply this tiny diff in each handler:
- let (stream_name, log_source, _, _) = setup_otel_stream( + // Ignore time_partition in OTEL + OSS + let (stream_name, log_source, _, _) = setup_otel_stream(Also applies to: 303-309, 323-329
396-399
: Good: post_event uses the shared validator and enforces OSS gatingvalidate_stream_for_ingestion centralizes policy; passing None keeps OSS semantics. Consider adding a test mirroring the ingest() test to cover post_event as well (expect HTTP 400 with the standardized OSS message).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/handlers/http/ingest.rs
(10 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T18:01:22.804Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:271-292
Timestamp: 2025-08-18T18:01:22.804Z
Learning: In Parseable's ingestion validation, validate_stream_for_ingestion is designed to prevent regular log ingestion endpoints (ingest() and post_event()) from ingesting into streams that exclusively contain OTEL traces or metrics. The function allows mixed streams (regular logs + OTEL) but blocks ingestion into OTEL-only streams, maintaining architectural separation between regular log and OTEL ingestion pathways.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:59:31.600Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:149-156
Timestamp: 2025-08-18T17:59:31.600Z
Learning: The time_partition parameter in push_logs() function in src/handlers/http/modal/utils/ingest_utils.rs is determined by the caller (flatten_and_push_logs). OSS callers pass None, enterprise callers pass the appropriate value (None or Some<>), and OTEL callers always pass None. The push_logs() function should not add additional time partition logic since it's already handled at the caller level.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:58:01.277Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:80-87
Timestamp: 2025-08-18T17:58:01.277Z
Learning: The architectural approach in Parseable for OTEL time partition restrictions is to enforce time_partition = None at the caller level (flatten_and_push_logs) rather than inside push_logs() itself. This allows enterprise versions to reuse push_logs() by passing actual time_partition values while OSS maintains the restriction at the ingestion entry point.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Applied to files:
src/handlers/http/ingest.rs
🧬 Code Graph Analysis (1)
src/handlers/http/ingest.rs (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)
validate_stream_for_ingestion
(271-292)flatten_and_push_logs
(51-131)src/validator.rs (1)
stream_name
(33-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/ingest.rs (1)
32-32
: LGTM: centralized validator import is correctImporting validate_stream_for_ingestion here aligns ingestion checks across endpoints and removes ad-hoc duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/handlers/http/ingest.rs (3)
127-127
: Clarify why time_partition is forced to None in OSSAdd a short comment so future changes don’t accidentally plumb time partitions through OSS paths.
Apply this diff:
- flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + // OSS: time partition is Enterprise-only; always pass None. + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
247-253
: OTEL ingestion correctly passes None for time_partition in OSSConsistent with the policy that OTEL streams don’t support time partitions in OSS.
398-398
: Mirror the OSS None-pass pattern with an explanatory commentAdd the same OSS gating note here for consistency with the /ingest handler.
Apply this diff:
- flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + // OSS: time partition is Enterprise-only; always pass None. + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
🧹 Nitpick comments (1)
src/handlers/http/ingest.rs (1)
193-222
: Return time_partition for Enterprise parity; add a note to avoid confusion in OSSReturning time_partition here is fine, but OSS OTEL paths intentionally ignore it. A brief comment helps future maintainers.
Apply this diff:
- let mut time_partition = None; + // NOTE: OTEL time partition is Enterprise-only. We return it here for + // Enterprise builds, but OSS paths intentionally ignore it. + let mut time_partition = None;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/handlers/http/ingest.rs
(10 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
📚 Learning: 2025-08-18T18:01:22.804Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:271-292
Timestamp: 2025-08-18T18:01:22.804Z
Learning: In Parseable's ingestion validation, validate_stream_for_ingestion is designed to prevent regular log ingestion endpoints (ingest() and post_event()) from ingesting into streams that exclusively contain OTEL traces or metrics. The function allows mixed streams (regular logs + OTEL) but blocks ingestion into OTEL-only streams, maintaining architectural separation between regular log and OTEL ingestion pathways.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:59:31.600Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:149-156
Timestamp: 2025-08-18T17:59:31.600Z
Learning: The time_partition parameter in push_logs() function in src/handlers/http/modal/utils/ingest_utils.rs is determined by the caller (flatten_and_push_logs). OSS callers pass None, enterprise callers pass the appropriate value (None or Some<>), and OTEL callers always pass None. The push_logs() function should not add additional time partition logic since it's already handled at the caller level.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:30:23.342Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:127-134
Timestamp: 2025-08-18T12:30:23.342Z
Learning: In Parseable, time partition functionality (ingestion to streams with time partitions) is restricted to enterprise features only. The OSS version intentionally blocks ingestion to time-partitioned streams with the error "Ingestion is not allowed to stream with time partition", while enterprise versions allow this functionality.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T17:58:01.277Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/modal/utils/ingest_utils.rs:80-87
Timestamp: 2025-08-18T17:58:01.277Z
Learning: The architectural approach in Parseable for OTEL time partition restrictions is to enforce time_partition = None at the caller level (flatten_and_push_logs) rather than inside push_logs() itself. This allows enterprise versions to reuse push_logs() by passing actual time_partition values while OSS maintains the restriction at the ingestion entry point.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T14:48:46.282Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:256-258
Timestamp: 2025-08-18T14:48:46.282Z
Learning: OTEL streams (OtelLogs, OtelMetrics, OtelTraces) in Parseable are intentionally designed to not support time partition functionality. The time_partition parameter should always be None for OTEL ingestion paths in flatten_and_push_logs calls.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.
Applied to files:
src/handlers/http/ingest.rs
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Applied to files:
src/handlers/http/ingest.rs
🧬 Code Graph Analysis (1)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (2)
validate_stream_for_ingestion
(271-292)flatten_and_push_logs
(51-131)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (5)
src/handlers/http/ingest.rs (5)
32-32
: Good call: centralize ingestion validation via shared helperImporting validate_stream_for_ingestion consolidates gating (OTEL-only streams + OSS time-partition block) and avoids per-endpoint drift.
121-121
: Centralized stream compatibility + OSS feature gating is correctly appliedCalling validate_stream_for_ingestion before ingesting ensures consistent errors and avoids duplication of checks across endpoints.
228-229
: LGTM: setup_otel_stream now exposes time_partitionReturning the stream’s time_partition enables Enterprise builds to use it downstream without another fetch.
283-289
: Destructuring with..
safely adapts to the new 4-tupleIgnoring the extra return item keeps code minimal and forward-compatible across editions.
Also applies to: 303-309, 323-329
396-396
: Centralized validation also applied to /logstream/{logstream}Consistent enforcement across both ingest endpoints. Good.
create stream with custom time partition
use header -
X-P-Time-Partition: - this has to be a timestamp field X-P-Time-Partition-Limit: max historical range default to 30d
server validates if all events in the batch has this time partition field if false, it rejects the whole batch
Fixes: #1400
Summary by CodeRabbit
New Features
Refactor
Bug Fixes
Chores