Skip to content

Commit cc303e0

Browse files
committed
fix edge cases with range bounds and add even more tests
1 parent 7f70eeb commit cc303e0

File tree

8 files changed

+626
-66
lines changed

8 files changed

+626
-66
lines changed

src/multi_reader.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ mod tests {
8989
let mut readers: VecDeque<_> = VecDeque::new();
9090

9191
for segment in &segments {
92-
readers.push_back(segment.iter());
92+
if let Some(iter) = segment.iter() {
93+
readers.push_back(iter);
94+
}
9395
}
9496

9597
let multi_reader = MultiReader::new(readers);
@@ -115,7 +117,9 @@ mod tests {
115117
let mut readers: VecDeque<_> = VecDeque::new();
116118

117119
for segment in &segments {
118-
readers.push_back(segment.iter());
120+
if let Some(iter) = segment.iter() {
121+
readers.push_back(iter);
122+
}
119123
}
120124

121125
let multi_reader = MultiReader::new(readers);
@@ -141,7 +145,9 @@ mod tests {
141145
let mut readers: VecDeque<_> = VecDeque::new();
142146

143147
for segment in &segments {
144-
readers.push_back(segment.iter());
148+
if let Some(iter) = segment.iter() {
149+
readers.push_back(iter);
150+
}
145151
}
146152

147153
let multi_reader = MultiReader::new(readers);

src/prefix.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ mod tests {
234234
let key = b"test_key";
235235
let prefixes: Vec<_> = extractor.extract(key).collect();
236236
assert_eq!(prefixes.len(), 1);
237-
assert_eq!(prefixes[0], b"test_key");
237+
assert_eq!(prefixes.first(), Some(&b"test_key".as_ref()));
238238
}
239239

240240
#[test]
@@ -245,19 +245,19 @@ mod tests {
245245
let key = b"longer_key";
246246
let prefixes: Vec<_> = extractor.extract(key).collect();
247247
assert_eq!(prefixes.len(), 1);
248-
assert_eq!(prefixes[0], b"longe");
248+
assert_eq!(prefixes.first(), Some(&b"longe".as_ref()));
249249

250250
// Key shorter than prefix
251251
let key = b"key";
252252
let prefixes: Vec<_> = extractor.extract(key).collect();
253253
assert_eq!(prefixes.len(), 1);
254-
assert_eq!(prefixes[0], b"key");
254+
assert_eq!(prefixes.first(), Some(&b"key".as_ref()));
255255

256256
// Key exactly prefix length
257257
let key = b"exact";
258258
let prefixes: Vec<_> = extractor.extract(key).collect();
259259
assert_eq!(prefixes.len(), 1);
260-
assert_eq!(prefixes[0], b"exact");
260+
assert_eq!(prefixes.first(), Some(&b"exact".as_ref()));
261261
}
262262

263263
#[test]
@@ -269,11 +269,11 @@ mod tests {
269269

270270
let prefixes: Vec<_> = full_key.extract(key).collect();
271271
assert_eq!(prefixes.len(), 1);
272-
assert_eq!(prefixes[0], b"");
272+
assert_eq!(prefixes.first(), Some(&b"".as_ref()));
273273

274274
let prefixes: Vec<_> = fixed.extract(key).collect();
275275
assert_eq!(prefixes.len(), 1);
276-
assert_eq!(prefixes[0], b"");
276+
assert_eq!(prefixes.first(), Some(&b"".as_ref()));
277277
}
278278

279279
#[test]
@@ -289,12 +289,12 @@ mod tests {
289289
let key = b"exact";
290290
let prefixes: Vec<_> = extractor.extract(key).collect();
291291
assert_eq!(prefixes.len(), 1);
292-
assert_eq!(prefixes[0], b"exact");
292+
assert_eq!(prefixes.first(), Some(&b"exact".as_ref()));
293293

294294
// Key longer than required length
295295
let key = b"longer_key";
296296
let prefixes: Vec<_> = extractor.extract(key).collect();
297297
assert_eq!(prefixes.len(), 1);
298-
assert_eq!(prefixes[0], b"longe");
298+
assert_eq!(prefixes.first(), Some(&b"longe".as_ref()));
299299
}
300300
}

src/range.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,15 +227,15 @@ impl TreeIter {
227227
range.start_bound().map(|x| &*x.user_key),
228228
range.end_bound().map(|x| &*x.user_key),
229229
)) {
230-
let reader = segment.range((
230+
if let Some(reader) = segment.range((
231231
range.start_bound().map(|x| &x.user_key).cloned(),
232232
range.end_bound().map(|x| &x.user_key).cloned(),
233-
));
234-
235-
iters.push(Box::new(reader.filter(move |item| match item {
236-
Ok(item) => seqno_filter(item.key.seqno, seqno),
237-
Err(_) => true,
238-
})));
233+
)) {
234+
iters.push(Box::new(reader.filter(move |item| match item {
235+
Ok(item) => seqno_filter(item.key.seqno, seqno),
236+
Err(_) => true,
237+
})));
238+
}
239239
}
240240
}
241241
_ => {

src/run_reader.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,15 @@ impl RunReader {
4444

4545
// TODO: lazily init readers?
4646
let lo_segment = run.deref().get(lo).expect("should exist");
47-
let lo_reader = lo_segment.range(range.clone())/* .cache_policy(cache_policy) */;
47+
let lo_reader = lo_segment
48+
.range(range.clone()) /* .cache_policy(cache_policy) */
49+
.map(|x| Box::new(x) as BoxedIterator);
4850

49-
// TODO: lazily init readers?
5051
let hi_reader = if hi > lo {
5152
let hi_segment = run.deref().get(hi).expect("should exist");
52-
Some(
53-
hi_segment.range(range), /* .cache_policy(cache_policy) */
54-
)
53+
hi_segment
54+
.range(range) /* .cache_policy(cache_policy) */
55+
.map(|x| Box::new(x) as BoxedIterator)
5556
} else {
5657
None
5758
};
@@ -60,8 +61,8 @@ impl RunReader {
6061
run,
6162
lo,
6263
hi,
63-
lo_reader: Some(Box::new(lo_reader)),
64-
hi_reader: hi_reader.map(|x| Box::new(x) as BoxedIterator),
64+
lo_reader,
65+
hi_reader,
6566
cache_policy,
6667
}
6768
}
@@ -82,9 +83,12 @@ impl Iterator for RunReader {
8283
self.lo += 1;
8384

8485
if self.lo < self.hi {
85-
self.lo_reader = Some(Box::new(
86-
self.run.get(self.lo).expect("should exist").iter(),
87-
) /* .cache_policy(self.cache_policy) */);
86+
self.lo_reader = self
87+
.run
88+
.get(self.lo)
89+
.expect("should exist")
90+
.iter()
91+
.map(|x| Box::new(x) as BoxedIterator); /* .cache_policy(self.cache_policy) */
8892
}
8993
} else if let Some(hi_reader) = &mut self.hi_reader {
9094
// NOTE: We reached the hi marker, so consume from it instead
@@ -111,9 +115,12 @@ impl DoubleEndedIterator for RunReader {
111115
self.hi -= 1;
112116

113117
if self.lo < self.hi {
114-
self.hi_reader = Some(Box::new(
115-
self.run.get(self.hi).expect("should exist").iter(),
116-
) /* .cache_policy(self.cache_policy) */);
118+
self.hi_reader = self
119+
.run
120+
.get(self.hi)
121+
.expect("should exist")
122+
.iter()
123+
.map(|x| Box::new(x) as BoxedIterator); /* .cache_policy(self.cache_policy) */
117124
}
118125
} else if let Some(lo_reader) = &mut self.lo_reader {
119126
// NOTE: We reached the lo marker, so consume from it instead

src/segment/mod.rs

Lines changed: 87 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ impl Segment {
319319
#[must_use]
320320
#[allow(clippy::iter_without_into_iter)]
321321
#[doc(hidden)]
322-
pub fn iter(&self) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> {
322+
pub fn iter(&self) -> Option<impl DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
323323
self.range(..)
324324
}
325325

@@ -366,7 +366,65 @@ impl Segment {
366366
false
367367
}
368368

369+
/// Extracts the common prefix from a range's start and end bounds
370+
fn extract_common_prefix_from_range<R: RangeBounds<UserKey>>(
371+
&self,
372+
range: &R,
373+
) -> Option<Vec<u8>> {
374+
// If we have a prefix extractor, we need to compare the extracted prefixes
375+
// Otherwise, we compare the raw keys
376+
let Some(ref prefix_extractor) = self.prefix_extractor else {
377+
// No prefix extractor, don't do bloom filter optimization
378+
return None;
379+
};
380+
381+
let start_key = match range.start_bound() {
382+
Bound::Included(key) | Bound::Excluded(key) => Some(key.as_ref()),
383+
Bound::Unbounded => None,
384+
};
385+
386+
let end_key = match range.end_bound() {
387+
Bound::Included(key) | Bound::Excluded(key) => Some(key.as_ref()),
388+
Bound::Unbounded => None,
389+
};
390+
391+
match (start_key, end_key) {
392+
(Some(start), Some(end)) => {
393+
// Extract prefixes using the prefix extractor
394+
let start_prefixes: Vec<_> = prefix_extractor.extract(start).collect();
395+
let end_prefixes: Vec<_> = prefix_extractor.extract(end).collect();
396+
397+
// If either key is out of domain (no prefixes), we can't optimize
398+
if start_prefixes.is_empty() || end_prefixes.is_empty() {
399+
return None;
400+
}
401+
402+
// Check if the first prefix of each is the same
403+
// For most extractors, there's only one prefix
404+
if let (Some(start_prefix), Some(end_prefix)) =
405+
(start_prefixes.first(), end_prefixes.first())
406+
{
407+
if start_prefix == end_prefix {
408+
Some(start_prefix.to_vec())
409+
} else {
410+
// Different prefixes, no common prefix
411+
None
412+
}
413+
} else {
414+
None
415+
}
416+
}
417+
(Some(key), None) | (None, Some(key)) => {
418+
// If only one bound exists, use its extracted prefix
419+
let prefixes: Vec<_> = prefix_extractor.extract(key).collect();
420+
prefixes.first().map(|p| p.to_vec())
421+
}
422+
(None, None) => None,
423+
}
424+
}
425+
369426
/// Creates a ranged iterator over the `Segment`.
427+
/// Returns None if the bloom filter indicates no keys with the common prefix exist.
370428
///
371429
/// # Errors
372430
///
@@ -377,17 +435,20 @@ impl Segment {
377435
pub fn range<R: RangeBounds<UserKey>>(
378436
&self,
379437
range: R,
380-
) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> {
438+
) -> Option<impl DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
381439
use crate::fallible_clipping_iter::FallibleClippingIter;
382440
use block_index::iter::create_index_block_reader;
383441

384-
// Check bloom filter for prefix existence if we have a start bound
385-
let skip_by_bloom = if let Bound::Included(key) | Bound::Excluded(key) = range.start_bound()
386-
{
387-
self.should_skip_by_bloom_filter(key)
388-
} else {
389-
false
390-
};
442+
// Check bloom filter using common prefix from range bounds
443+
if let Some(common_prefix) = self.extract_common_prefix_from_range(&range) {
444+
if self.should_skip_by_bloom_filter(&UserKey::from(common_prefix)) {
445+
#[cfg(feature = "metrics")]
446+
self.metrics
447+
.bloom_filter_hits
448+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
449+
return None;
450+
}
451+
}
391452

392453
// TODO: enum_dispatch BlockIndex::iter
393454
let index_block = match &*self.block_index {
@@ -417,28 +478,16 @@ impl Segment {
417478
self.metrics.clone(),
418479
);
419480

420-
// Handle bloom filter skip case
421-
if skip_by_bloom {
422-
#[cfg(feature = "metrics")]
423-
self.metrics
424-
.bloom_filter_hits
425-
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
426-
427-
// Set invalid bounds to ensure no items are yielded
428-
iter.set_lower_bound(UserKey::from(&[255u8; 1]));
429-
iter.set_upper_bound(UserKey::from(&[0u8; 1]));
430-
} else {
431-
// Set normal iterator bounds based on range
432-
if let Bound::Excluded(key) | Bound::Included(key) = range.start_bound() {
433-
iter.set_lower_bound(key.clone());
434-
}
481+
// Set normal iterator bounds based on range
482+
if let Bound::Excluded(key) | Bound::Included(key) = range.start_bound() {
483+
iter.set_lower_bound(key.clone());
484+
}
435485

436-
if let Bound::Excluded(key) | Bound::Included(key) = range.end_bound() {
437-
iter.set_upper_bound(key.clone());
438-
}
486+
if let Bound::Excluded(key) | Bound::Included(key) = range.end_bound() {
487+
iter.set_upper_bound(key.clone());
439488
}
440489

441-
FallibleClippingIter::new(iter, range)
490+
Some(FallibleClippingIter::new(iter, range))
442491
}
443492

444493
/// Tries to recover a segment from a file.
@@ -804,10 +853,12 @@ mod tests {
804853
"should use full index, so only TLI exists",
805854
);
806855

807-
assert_eq!(items, &*segment.iter().flatten().collect::<Vec<_>>());
856+
let iter = segment.iter().unwrap();
857+
assert_eq!(items, &*iter.flatten().collect::<Vec<_>>());
858+
let iter = segment.iter().unwrap();
808859
assert_eq!(
809860
items.iter().rev().cloned().collect::<Vec<_>>(),
810-
&*segment.iter().rev().flatten().collect::<Vec<_>>(),
861+
&*iter.rev().flatten().collect::<Vec<_>>(),
811862
);
812863
}
813864

@@ -865,6 +916,7 @@ mod tests {
865916
items.iter().skip(1).cloned().collect::<Vec<_>>(),
866917
&*segment
867918
.range(UserKey::from("b")..)
919+
.unwrap()
868920
.flatten()
869921
.collect::<Vec<_>>()
870922
);
@@ -873,6 +925,7 @@ mod tests {
873925
items.iter().skip(1).rev().cloned().collect::<Vec<_>>(),
874926
&*segment
875927
.range(UserKey::from("b")..)
928+
.unwrap()
876929
.rev()
877930
.flatten()
878931
.collect::<Vec<_>>(),
@@ -929,8 +982,7 @@ mod tests {
929982
"should use full index, so only TLI exists",
930983
);
931984

932-
let mut iter = segment
933-
.range(UserKey::from(5u64.to_be_bytes())..UserKey::from(10u64.to_be_bytes()));
985+
let mut iter = segment.range(..).unwrap();
934986

935987
let mut count = 0;
936988

@@ -950,7 +1002,7 @@ mod tests {
9501002
}
9511003
}
9521004

953-
assert_eq!(5, count);
1005+
assert_eq!(10, count);
9541006
}
9551007

9561008
Ok(())
@@ -1009,6 +1061,7 @@ mod tests {
10091061
items.iter().skip(1).take(3).cloned().collect::<Vec<_>>(),
10101062
&*segment
10111063
.range(UserKey::from("b")..=UserKey::from("d"))
1064+
.unwrap()
10121065
.flatten()
10131066
.collect::<Vec<_>>()
10141067
);
@@ -1023,6 +1076,7 @@ mod tests {
10231076
.collect::<Vec<_>>(),
10241077
&*segment
10251078
.range(UserKey::from("b")..=UserKey::from("d"))
1079+
.unwrap()
10261080
.rev()
10271081
.flatten()
10281082
.collect::<Vec<_>>(),

0 commit comments

Comments
 (0)