Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
catalog::manifest::{File, Manifest},
handlers::http::cluster::INTERNAL_STREAM_NAME,
parseable::PARSEABLE,
storage::{ObjectStorage, ObjectStorageError},
storage::{ObjectStorage, ObjectStorageError, field_stats::DATASET_STATS_STREAM_NAME},
utils::{extract_datetime, human_size::bytes_to_human_size},
validator::error::HotTierValidationError,
};
Expand Down Expand Up @@ -252,6 +252,11 @@ impl HotTierManager {

///sync the hot tier files from S3 to the hot tier directory for all streams
async fn sync_hot_tier(&self) -> Result<(), HotTierError> {
// Before syncing, check if pstats stream was created and needs hot tier
if let Err(e) = self.create_pstats_hot_tier().await {
tracing::trace!("Skipping pstats hot tier creation because of error: {e}");
}

let mut sync_hot_tier_tasks = FuturesUnordered::new();
for stream in PARSEABLE.streams.list() {
if self.check_stream_hot_tier_exists(&stream) {
Expand Down Expand Up @@ -708,6 +713,30 @@ impl HotTierManager {
Ok(())
}

/// Creates hot tier for pstats internal stream if the stream exists in storage
async fn create_pstats_hot_tier(&self) -> Result<(), HotTierError> {
// Check if pstats hot tier already exists
if !self.check_stream_hot_tier_exists(DATASET_STATS_STREAM_NAME) {
// Check if pstats stream exists in storage by attempting to load it
if PARSEABLE
.check_or_load_stream(DATASET_STATS_STREAM_NAME)
.await
{
let mut stream_hot_tier = StreamHotTier {
version: Some(CURRENT_HOT_TIER_VERSION.to_string()),
size: MIN_STREAM_HOT_TIER_SIZE_BYTES,
used_size: 0,
available_size: MIN_STREAM_HOT_TIER_SIZE_BYTES,
oldest_date_time_entry: None,
};
self.put_hot_tier(DATASET_STATS_STREAM_NAME, &mut stream_hot_tier)
.await?;
}
}

Ok(())
}

/// Get the disk usage for the hot tier storage path. If we have a three disk paritions
/// mounted as follows:
/// 1. /
Expand Down
Loading