Skip to content

Commit f513d92

Browse files
committed
Add metastore refresh poll
1 parent a0c1760 commit f513d92

File tree

3 files changed

+28
-35
lines changed

3 files changed

+28
-35
lines changed

quickwit/quickwit-lambda/src/indexer/environment.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ use std::env::var;
2121

2222
use once_cell::sync::Lazy;
2323

24-
pub const CONFIGURATION_TEMPLATE: &str = "version: 0.8
24+
pub const CONFIGURATION_TEMPLATE: &str = r#"
25+
version: 0.8
2526
node_id: lambda-indexer
2627
cluster_id: lambda-ephemeral
2728
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
2829
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
2930
data_dir: /tmp
30-
";
31+
"#;
3132

3233
pub static INDEX_CONFIG_URI: Lazy<String> = Lazy::new(|| {
3334
var("QW_LAMBDA_INDEX_CONFIG_URI").expect("QW_LAMBDA_INDEX_CONFIG_URI must be set")

quickwit/quickwit-lambda/src/searcher/environment.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
// You should have received a copy of the GNU Affero General Public License
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

20-
pub(crate) const CONFIGURATION_TEMPLATE: &str = "version: 0.8
20+
pub(crate) const CONFIGURATION_TEMPLATE: &str = r#"
21+
version: 0.8
2122
node_id: lambda-searcher
22-
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
23+
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL:-1m}
2324
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
2425
data_dir: /tmp
2526
searcher:
2627
partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M}
27-
";
28+
"#;

quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ use tracing::error;
2929
use super::file_backed_index::FileBackedIndex;
3030
use super::store_operations::{load_index, METASTORE_FILE_NAME};
3131

32-
/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex`
33-
/// on demand and optionally spawns a task to poll
34-
/// regularly the storage and update the index.
32+
/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first
33+
/// loaded, it optionally spawns a task to regularly poll the storage and update the index.
3534
pub(crate) struct LazyFileBackedIndex {
3635
index_id: IndexId,
3736
storage: Arc<dyn Storage>,
@@ -48,8 +47,8 @@ impl LazyFileBackedIndex {
4847
file_backed_index: Option<FileBackedIndex>,
4948
) -> Self {
5049
let index_mutex_opt = file_backed_index.map(|index| Arc::new(Mutex::new(index)));
51-
// If an index is given and a polling interval is given,
52-
// spawn immediately the polling task.
50+
// If the polling interval is configured and the index is already loaded,
51+
// spawn immediately the polling task
5352
if let Some(index_mutex) = &index_mutex_opt {
5453
if let Some(polling_interval) = polling_interval_opt {
5554
spawn_index_metadata_polling_task(
@@ -68,15 +67,24 @@ impl LazyFileBackedIndex {
6867
}
6968
}
7069

71-
/// Get `FileBackedIndex`.
70+
/// Get a syncronized `FileBackedIndex`. If the index wasn't provided on creation, we load it
71+
/// lazily on the first call of this method.
7272
pub async fn get(&self) -> MetastoreResult<Arc<Mutex<FileBackedIndex>>> {
7373
self.lazy_index
74-
.get_or_try_init(|| {
75-
load_file_backed_index(
76-
self.storage.clone(),
77-
self.index_id.clone(),
78-
self.polling_interval_opt,
79-
)
74+
.get_or_try_init(|| async move {
75+
let index = load_index(&*self.storage, &self.index_id).await?;
76+
let index_mutex = Arc::new(Mutex::new(index));
77+
// When the index is loaded lazily, the polling task is not started in the
78+
// constructor so we do it here when the index is actually loaded.
79+
if let Some(polling_interval) = self.polling_interval_opt {
80+
spawn_index_metadata_polling_task(
81+
self.storage.clone(),
82+
self.index_id.clone(),
83+
Arc::downgrade(&index_mutex),
84+
polling_interval,
85+
);
86+
}
87+
Ok(index_mutex)
8088
})
8189
.await
8290
.cloned()
@@ -122,6 +130,7 @@ fn spawn_index_metadata_polling_task(
122130
) {
123131
tokio::task::spawn(async move {
124132
let mut interval = tokio::time::interval(polling_interval);
133+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
125134
interval.tick().await; //< this is to prevent fetch right after the first population of the data.
126135

127136
while let Some(metadata_mutex) = metastore_weak.upgrade() {
@@ -130,21 +139,3 @@ fn spawn_index_metadata_polling_task(
130139
}
131140
});
132141
}
133-
134-
async fn load_file_backed_index(
135-
storage: Arc<dyn Storage>,
136-
index_id: IndexId,
137-
polling_interval_opt: Option<Duration>,
138-
) -> MetastoreResult<Arc<Mutex<FileBackedIndex>>> {
139-
let index = load_index(&*storage, &index_id).await?;
140-
let index_mutex = Arc::new(Mutex::new(index));
141-
if let Some(polling_interval) = polling_interval_opt {
142-
spawn_index_metadata_polling_task(
143-
storage.clone(),
144-
index_id.clone(),
145-
Arc::downgrade(&index_mutex),
146-
polling_interval,
147-
);
148-
}
149-
Ok(index_mutex)
150-
}

0 commit comments

Comments
 (0)