Skip to content

Commit f1381c0

Browse files
committed
add prefix-based filtering and lazy segment loading to RunReader
1 parent 323c0b3 commit f1381c0

File tree

2 files changed

+218
-14
lines changed

2 files changed

+218
-14
lines changed

src/run_reader.rs

Lines changed: 197 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,24 @@ impl RunReader {
2929

3030
let (lo, hi) = run.range_indexes(&range)?;
3131

32+
// Check if any segment in the range might contain data
33+
// This provides early filtering for prefix-based queries
34+
let segments_in_range = &run[lo..=hi];
35+
let mut has_potential_match = false;
36+
37+
for segment in segments_in_range {
38+
// Check if segment might contain data for this range
39+
// If segment has no prefix extractor or can't determine, assume it might contain data
40+
if segment.might_contain_range(&range) {
41+
has_potential_match = true;
42+
break;
43+
}
44+
}
45+
46+
if !has_potential_match {
47+
return None;
48+
}
49+
3250
Some(Self::culled(run, range, (Some(lo), Some(hi)), cache_policy))
3351
}
3452

@@ -83,12 +101,22 @@ impl Iterator for RunReader {
83101
self.lo += 1;
84102

85103
if self.lo < self.hi {
86-
self.lo_reader = self
87-
.run
88-
.get(self.lo)
89-
.expect("should exist")
90-
.iter()
91-
.map(|x| Box::new(x) as BoxedIterator); /* .cache_policy(self.cache_policy) */
104+
// Lazily check next segment for potential matches
105+
// This avoids unnecessary I/O for segments that won't contain our prefix
106+
loop {
107+
if self.lo >= self.hi {
108+
break;
109+
}
110+
111+
let segment = self.run.get(self.lo).expect("should exist");
112+
if let Some(reader) = segment.iter() {
113+
self.lo_reader = Some(Box::new(reader) as BoxedIterator);
114+
break;
115+
}
116+
117+
// Skip this segment as it doesn't contain our range
118+
self.lo += 1;
119+
}
92120
}
93121
} else if let Some(hi_reader) = &mut self.hi_reader {
94122
// NOTE: We reached the hi marker, so consume from it instead
@@ -115,12 +143,21 @@ impl DoubleEndedIterator for RunReader {
115143
self.hi -= 1;
116144

117145
if self.lo < self.hi {
118-
self.hi_reader = self
119-
.run
120-
.get(self.hi)
121-
.expect("should exist")
122-
.iter()
123-
.map(|x| Box::new(x) as BoxedIterator); /* .cache_policy(self.cache_policy) */
146+
// Lazily check prev segment for potential matches
147+
loop {
148+
if self.hi <= self.lo {
149+
break;
150+
}
151+
152+
let segment = self.run.get(self.hi).expect("should exist");
153+
if let Some(reader) = segment.iter() {
154+
self.hi_reader = Some(Box::new(reader) as BoxedIterator);
155+
break;
156+
}
157+
158+
// Skip this segment as it doesn't contain our range
159+
self.hi -= 1;
160+
}
124161
}
125162
} else if let Some(lo_reader) = &mut self.lo_reader {
126163
// NOTE: We reached the lo marker, so consume from it instead
@@ -138,7 +175,8 @@ impl DoubleEndedIterator for RunReader {
138175
#[allow(clippy::expect_used)]
139176
mod tests {
140177
use super::*;
141-
use crate::{AbstractTree, Slice};
178+
use crate::{AbstractTree, Config, Slice};
179+
use std::sync::Arc;
142180
use test_log::test;
143181

144182
#[test]
@@ -299,4 +337,150 @@ mod tests {
299337

300338
Ok(())
301339
}
340+
341+
#[test]
342+
fn test_run_reader_prefix_filtering() -> crate::Result<()> {
343+
use crate::prefix::FixedPrefixExtractor;
344+
345+
let tempdir = tempfile::tempdir()?;
346+
let tree = Config::new(&tempdir)
347+
.prefix_extractor(Arc::new(FixedPrefixExtractor::new(3)))
348+
.open()?;
349+
350+
// Create segments with different prefixes
351+
let prefixes = [
352+
["aaa_1", "aaa_2", "aaa_3"],
353+
["bbb_1", "bbb_2", "bbb_3"],
354+
["ccc_1", "ccc_2", "ccc_3"],
355+
["ddd_1", "ddd_2", "ddd_3"],
356+
];
357+
358+
for batch in prefixes {
359+
for id in batch {
360+
tree.insert(id, vec![], 0);
361+
}
362+
tree.flush_active_memtable(0)?;
363+
}
364+
365+
let segments = tree
366+
.manifest
367+
.read()
368+
.expect("lock is poisoned")
369+
.iter()
370+
.cloned()
371+
.collect::<Vec<_>>();
372+
373+
let run = Arc::new(Run::new(segments));
374+
375+
// Test 1: Query for non-existent prefix should return None
376+
assert!(
377+
RunReader::new(
378+
run.clone(),
379+
UserKey::from("zzz_1")..=UserKey::from("zzz_9"),
380+
CachePolicy::Read
381+
)
382+
.is_none(),
383+
"Should return None for non-existent prefix"
384+
);
385+
386+
// Test 2: Query for existing prefix should return reader
387+
let reader = RunReader::new(
388+
run.clone(),
389+
UserKey::from("bbb_1")..=UserKey::from("bbb_3"),
390+
CachePolicy::Read,
391+
);
392+
assert!(reader.is_some(), "Should return reader for existing prefix");
393+
394+
if let Some(reader) = reader {
395+
let items: Vec<_> = reader.flatten().map(|item| item.key.user_key).collect();
396+
assert_eq!(items.len(), 3);
397+
assert_eq!(items.first(), Some(&Slice::from(*b"bbb_1")));
398+
assert_eq!(items.get(1), Some(&Slice::from(*b"bbb_2")));
399+
assert_eq!(items.get(2), Some(&Slice::from(*b"bbb_3")));
400+
}
401+
402+
// Test 3: Range query across prefixes with no common prefix
403+
let reader = RunReader::new(
404+
run,
405+
UserKey::from("aaa_3")..=UserKey::from("bbb_1"),
406+
CachePolicy::Read,
407+
);
408+
// Should still work since segments contain the range
409+
assert!(reader.is_some());
410+
411+
Ok(())
412+
}
413+
414+
#[test]
415+
fn test_run_reader_lazy_segment_loading() -> crate::Result<()> {
416+
use crate::prefix::FixedPrefixExtractor;
417+
418+
let tempdir = tempfile::tempdir()?;
419+
let tree = Config::new(&tempdir)
420+
.prefix_extractor(Arc::new(FixedPrefixExtractor::new(4)))
421+
.open()?;
422+
423+
// Create many segments with distinct prefixes
424+
let prefixes = [
425+
["pre1_a", "pre1_b", "pre1_c"],
426+
["pre2_a", "pre2_b", "pre2_c"],
427+
["pre3_a", "pre3_b", "pre3_c"],
428+
["pre4_a", "pre4_b", "pre4_c"],
429+
["pre5_a", "pre5_b", "pre5_c"],
430+
["pre6_a", "pre6_b", "pre6_c"],
431+
];
432+
433+
for batch in prefixes {
434+
for id in batch {
435+
tree.insert(id, vec![], 0);
436+
}
437+
tree.flush_active_memtable(0)?;
438+
}
439+
440+
let segments = tree
441+
.manifest
442+
.read()
443+
.expect("lock is poisoned")
444+
.iter()
445+
.cloned()
446+
.collect::<Vec<_>>();
447+
448+
let run = Arc::new(Run::new(segments));
449+
450+
// Query for a specific prefix in the middle
451+
// Should skip segments without the prefix lazily
452+
let reader = RunReader::new(
453+
run.clone(),
454+
UserKey::from("pre4_a")..=UserKey::from("pre4_c"),
455+
CachePolicy::Read,
456+
);
457+
458+
assert!(reader.is_some());
459+
460+
if let Some(reader) = reader {
461+
let items: Vec<_> = reader.flatten().map(|item| item.key.user_key).collect();
462+
assert_eq!(items.len(), 3);
463+
assert_eq!(items.first(), Some(&Slice::from(*b"pre4_a")));
464+
assert_eq!(items.get(1), Some(&Slice::from(*b"pre4_b")));
465+
assert_eq!(items.get(2), Some(&Slice::from(*b"pre4_c")));
466+
}
467+
468+
// Query for prefix at the beginning
469+
let reader = RunReader::new(
470+
run.clone(),
471+
UserKey::from("pre1_a")..=UserKey::from("pre1_c"),
472+
CachePolicy::Read,
473+
);
474+
assert!(reader.is_some());
475+
476+
// Query for prefix at the end
477+
let reader = RunReader::new(
478+
run,
479+
UserKey::from("pre6_a")..=UserKey::from("pre6_c"),
480+
CachePolicy::Read,
481+
);
482+
assert!(reader.is_some());
483+
484+
Ok(())
485+
}
302486
}

src/segment/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,10 +653,30 @@ impl Segment {
653653
self.metadata.key_range.overlaps_with_bounds(bounds)
654654
}
655655

656+
/// Returns the seqno range of the `Segment`.
657+
#[must_use]
658+
pub fn seqno_range(&self) -> (SeqNo, SeqNo) {
659+
self.0.metadata.seqnos
660+
}
661+
656662
/// Returns the highest sequence number in the segment.
657663
#[must_use]
658664
pub fn get_highest_seqno(&self) -> SeqNo {
659-
self.metadata.seqnos.1
665+
self.0.metadata.seqnos.1
666+
}
667+
668+
/// Checks if this segment might contain data for the given range.
669+
/// Returns false only if we can definitively rule out the segment using bloom filters.
670+
/// Returns true if the segment might contain data (or if we can't determine).
671+
#[must_use]
672+
pub fn might_contain_range<R: RangeBounds<UserKey>>(&self, range: &R) -> bool {
673+
// If no prefix extractor, we can't use bloom filter optimization
674+
if self.prefix_extractor.is_none() {
675+
return true;
676+
}
677+
678+
// Check if we can skip this segment based on bloom filter
679+
!self.should_skip_range_by_prefix_bloom_filter(range)
660680
}
661681

662682
/// Returns the amount of tombstone markers in the `Segment`.

0 commit comments

Comments
 (0)