Skip to content

Commit 0e35b07

Browse files
Updates (#1408)
- changes related to time-partition - handled counts request - added length check to `merge_queried_stats` Co-authored-by: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com>
1 parent 9712581 commit 0e35b07

File tree

7 files changed

+72
-31
lines changed

7 files changed

+72
-31
lines changed

src/alerts/alert_structs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,13 @@ impl AlertRequest {
270270
TARGETS.get_target_by_id(id).await?;
271271
}
272272
let datasets = resolve_stream_names(&self.query)?;
273+
274+
if datasets.len() != 1 {
275+
return Err(AlertError::ValidationFailure(format!(
276+
"Query should include only one dataset. Found: {datasets:?}"
277+
)));
278+
}
279+
273280
let config = AlertConfig {
274281
version: AlertVersion::from(CURRENT_ALERTS_VERSION),
275282
id: Ulid::new(),

src/handlers/http/cluster/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,7 +1114,7 @@ struct QuerierStatus {
11141114
last_used: Option<Instant>,
11151115
}
11161116

1117-
async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
1117+
pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
11181118
// Get all querier metadata
11191119
let querier_metadata: Vec<NodeMetadata> = get_node_info(NodeType::Querier).await?;
11201120

src/handlers/http/cluster/utils.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use crate::{
2020
INTRA_CLUSTER_CLIENT,
2121
handlers::http::{base_path_without_preceding_slash, modal::NodeType},
22+
prism::logstream::PrismLogstreamError,
2223
};
2324
use actix_web::http::header;
2425
use chrono::{DateTime, Utc};
@@ -135,7 +136,12 @@ impl StorageStats {
135136
}
136137
}
137138

138-
pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
139+
pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> Result<QueriedStats, PrismLogstreamError> {
140+
if stats.len() < 2 {
141+
return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
142+
"Expected at least two logstreams in merge_queried_stats",
143+
)));
144+
}
139145
// get the stream name
140146
let stream_name = stats[1].stream.clone();
141147

@@ -167,12 +173,12 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
167173
deleted_size: acc.deleted_size + x.deleted_size,
168174
});
169175

170-
QueriedStats::new(
176+
Ok(QueriedStats::new(
171177
&stream_name,
172178
min_time,
173179
cumulative_ingestion,
174180
cumulative_storage,
175-
)
181+
))
176182
}
177183

178184
pub async fn check_liveness(domain_name: &str) -> bool {

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,17 @@ use tracing::{error, warn};
3333
static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
3434

3535
use crate::{
36-
handlers::http::{
37-
base_path_without_preceding_slash,
38-
cluster::{
39-
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
40-
utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats},
36+
handlers::{
37+
UPDATE_STREAM_KEY,
38+
http::{
39+
base_path_without_preceding_slash,
40+
cluster::{
41+
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
42+
utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats},
43+
},
44+
logstream::error::StreamError,
45+
modal::{NodeMetadata, NodeType},
4146
},
42-
logstream::error::StreamError,
43-
modal::{NodeMetadata, NodeType},
4447
},
4548
hottier::HotTierManager,
4649
parseable::{PARSEABLE, StreamNotFound},
@@ -115,14 +118,24 @@ pub async fn put_stream(
115118
body: Bytes,
116119
) -> Result<impl Responder, StreamError> {
117120
let stream_name = stream_name.into_inner();
118-
let _ = CREATE_STREAM_LOCK.lock().await;
121+
let _guard = CREATE_STREAM_LOCK.lock().await;
119122
let headers = PARSEABLE
120123
.create_update_stream(req.headers(), &body, &stream_name)
121124
.await?;
122125

126+
let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
127+
val.to_str().unwrap() == "true"
128+
} else {
129+
false
130+
};
131+
123132
sync_streams_with_ingestors(headers, body, &stream_name).await?;
124133

125-
Ok(("Log stream created", StatusCode::OK))
134+
if is_update {
135+
Ok(("Log stream updated", StatusCode::OK))
136+
} else {
137+
Ok(("Log stream created", StatusCode::OK))
138+
}
126139
}
127140

128141
pub async fn get_stats(
@@ -218,7 +231,8 @@ pub async fn get_stats(
218231

219232
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
220233
ingestor_stats.push(stats);
221-
merge_quried_stats(ingestor_stats)
234+
merge_queried_stats(ingestor_stats)
235+
.map_err(|e| StreamError::Anyhow(anyhow::Error::msg(e.to_string())))?
222236
} else {
223237
stats
224238
};

src/handlers/http/query.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
2121
use crate::option::Mode;
22+
use crate::rbac::map::SessionKey;
2223
use crate::utils::arrow::record_batches_to_json;
2324
use actix_web::http::header::ContentType;
2425
use actix_web::web::{self, Json};
@@ -43,7 +44,7 @@ use std::time::Instant;
4344
use tokio::task::JoinSet;
4445
use tracing::{error, warn};
4546

46-
use crate::event::commit_schema;
47+
use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
4748
use crate::metrics::QUERY_EXECUTE_TIME;
4849
use crate::parseable::{PARSEABLE, StreamNotFound};
4950
use crate::query::error::ExecuteError;
@@ -79,7 +80,7 @@ pub struct Query {
7980
/// TODO: Improve this function and make this a part of the query API
8081
pub async fn get_records_and_fields(
8182
query_request: &Query,
82-
req: &HttpRequest,
83+
creds: &SessionKey,
8384
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
8485
let session_state = QUERY_SESSION.state();
8586
let time_range =
@@ -89,8 +90,8 @@ pub async fn get_records_and_fields(
8990
create_streams_for_distributed(tables.clone()).await?;
9091

9192
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
92-
let creds = extract_session_key_from_req(req)?;
93-
let permissions = Users.get_permissions(&creds);
93+
94+
let permissions = Users.get_permissions(creds);
9495

9596
user_auth_for_datasets(&permissions, &tables).await?;
9697

@@ -350,7 +351,12 @@ pub async fn get_counts(
350351
// if the user has given a sql query (counts call with filters applied), then use this flow
351352
// this could include filters or group by
352353
if body.conditions.is_some() {
353-
let sql = body.get_df_sql().await?;
354+
let time_partition = PARSEABLE
355+
.get_stream(&body.stream)?
356+
.get_time_partition()
357+
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());
358+
359+
let sql = body.get_df_sql(time_partition).await?;
354360

355361
let query_request = Query {
356362
query: sql,
@@ -362,7 +368,9 @@ pub async fn get_counts(
362368
filter_tags: None,
363369
};
364370

365-
let (records, _) = get_records_and_fields(&query_request, &req).await?;
371+
let creds = extract_session_key_from_req(&req)?;
372+
373+
let (records, _) = get_records_and_fields(&query_request, &creds).await?;
366374

367375
if let Some(records) = records {
368376
let json_records = record_batches_to_json(&records)?;

src/prism/logstream/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
handlers::http::{
3131
cluster::{
3232
fetch_stats_from_ingestors,
33-
utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats},
33+
utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats},
3434
},
3535
logstream::error::StreamError,
3636
query::{QueryError, update_schema_when_distributed},
@@ -136,7 +136,7 @@ async fn get_stats(stream_name: &str) -> Result<QueriedStats, PrismLogstreamErro
136136

137137
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
138138
ingestor_stats.push(stats);
139-
merge_quried_stats(ingestor_stats)
139+
merge_queried_stats(ingestor_stats)?
140140
} else {
141141
stats
142142
};
@@ -218,7 +218,7 @@ pub struct PrismDatasetResponse {
218218

219219
/// Request parameters for retrieving Prism dataset information.
220220
/// Defines which streams to query
221-
#[derive(Deserialize, Default)]
221+
#[derive(Deserialize, Default, Serialize)]
222222
#[serde(rename_all = "camelCase")]
223223
pub struct PrismDatasetRequest {
224224
/// List of stream names to query
@@ -381,6 +381,10 @@ pub enum PrismLogstreamError {
381381
Execute(#[from] ExecuteError),
382382
#[error("Auth: {0}")]
383383
Auth(#[from] actix_web::Error),
384+
#[error("SerdeError: {0}")]
385+
SerdeError(#[from] serde_json::Error),
386+
#[error("ReqwestError: {0}")]
387+
ReqwestError(#[from] reqwest::Error),
384388
}
385389

386390
impl actix_web::ResponseError for PrismLogstreamError {
@@ -393,6 +397,8 @@ impl actix_web::ResponseError for PrismLogstreamError {
393397
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
394398
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
395399
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
400+
PrismLogstreamError::SerdeError(_) => StatusCode::INTERNAL_SERVER_ERROR,
401+
PrismLogstreamError::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR,
396402
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
397403
}
398404
}

src/query/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use crate::catalog::Snapshot as CatalogSnapshot;
5656
use crate::catalog::column::{Int64Type, TypedStatistics};
5757
use crate::catalog::manifest::Manifest;
5858
use crate::catalog::snapshot::Snapshot;
59-
use crate::event::{self, DEFAULT_TIMESTAMP_KEY};
59+
use crate::event::DEFAULT_TIMESTAMP_KEY;
6060
use crate::handlers::http::query::QueryError;
6161
use crate::option::Mode;
6262
use crate::parseable::PARSEABLE;
@@ -345,7 +345,7 @@ impl CountsRequest {
345345
.get_stream(&self.stream)
346346
.map_err(|err| anyhow::Error::msg(err.to_string()))?
347347
.get_time_partition()
348-
.unwrap_or_else(|| event::DEFAULT_TIMESTAMP_KEY.to_owned());
348+
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.to_owned());
349349

350350
// get time range
351351
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
@@ -441,7 +441,7 @@ impl CountsRequest {
441441
}
442442

443443
/// This function will get executed only if self.conditions is some
444-
pub async fn get_df_sql(&self) -> Result<String, QueryError> {
444+
pub async fn get_df_sql(&self, time_column: String) -> Result<String, QueryError> {
445445
// unwrap because we have asserted that it is some
446446
let count_conditions = self.conditions.as_ref().unwrap();
447447

@@ -452,19 +452,19 @@ impl CountsRequest {
452452
let date_bin = if dur.num_minutes() <= 60 * 10 {
453453
// date_bin 1 minute
454454
format!(
455-
"CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
455+
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
456456
self.stream
457457
)
458458
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
459459
// date_bin 1 hour
460460
format!(
461-
"CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
461+
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
462462
self.stream
463463
)
464464
} else {
465465
// date_bin 1 day
466466
format!(
467-
"CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
467+
"CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
468468
self.stream
469469
)
470470
};
@@ -486,7 +486,7 @@ impl CountsRequest {
486486
}
487487

488488
/// Response for the counts API
489-
#[derive(Debug, Serialize, Clone)]
489+
#[derive(Debug, Serialize, Clone, Deserialize)]
490490
pub struct CountsResponse {
491491
/// Fields in the log stream
492492
pub fields: Vec<String>,
@@ -653,7 +653,7 @@ fn table_contains_any_time_filters(
653653
})
654654
.any(|expr| {
655655
matches!(&*expr.left, Expr::Column(Column { name, .. })
656-
if name == time_column)
656+
if name == &default_timestamp || name == time_column)
657657
})
658658
}
659659

0 commit comments

Comments
 (0)