Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ pub trait AbstractTree {
/// Returns the approximate number of tombstones in the tree.
fn tombstone_count(&self) -> u64;

/// Returns the approximate number of weak tombstones (single deletes) in the tree.
fn weak_tombstone_count(&self) -> u64;

/// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
fn weak_tombstone_reclaimable_count(&self) -> u64;

// TODO: clear() with Nuke compaction strategy (write lock) -> drop_range(..)

/// Drops segments that are fully contained in a given range.
Expand Down
8 changes: 8 additions & 0 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ impl AbstractTree for BlobTree {
self.index.tombstone_count()
}

fn weak_tombstone_count(&self) -> u64 {
self.index.weak_tombstone_count()
}

fn weak_tombstone_reclaimable_count(&self) -> u64 {
self.index.weak_tombstone_reclaimable_count()
}

fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
self.index.drop_range(range)
}
Expand Down
4 changes: 4 additions & 0 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ impl CompactionStrategy for Strategy {
let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];

{
// TODO(weak-tombstone-rewrite): incorporate `Segment::weak_tombstone_count` and
// `Segment::weak_tombstone_reclaimable` when computing level scores so rewrite
// decisions can prioritize segments that would free the most reclaimable values.

// Score first level

// NOTE: We always have at least one level
Expand Down
6 changes: 6 additions & 0 deletions src/segment/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct ParsedMeta {
pub file_size: u64,
pub item_count: u64,
pub tombstone_count: u64,
pub weak_tombstone_count: u64,
pub weak_tombstone_reclaimable: u64,

pub data_block_compression: CompressionType,
pub index_block_compression: CompressionType,
Expand Down Expand Up @@ -125,6 +127,8 @@ impl ParsedMeta {
let data_block_count = read_u64!(block, b"#data_block_count");
let index_block_count = read_u64!(block, b"#index_block_count");
let file_size = read_u64!(block, b"#size"); // TODO: rename file_size
let weak_tombstone_count = read_u64!(block, b"#weak_tombstone_count");
let weak_tombstone_reclaimable = read_u64!(block, b"#weak_tombstone_reclaimable");

let created_at = {
let bytes = block
Expand Down Expand Up @@ -196,6 +200,8 @@ impl ParsedMeta {
file_size,
item_count,
tombstone_count,
weak_tombstone_count,
weak_tombstone_reclaimable,
data_block_compression,
index_block_compression,
})
Expand Down
14 changes: 14 additions & 0 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,20 @@ impl Segment {
self.metadata.tombstone_count
}

/// Returns the number of weak (single delete) tombstones in the `Segment`.
#[must_use]
#[doc(hidden)]
pub fn weak_tombstone_count(&self) -> u64 {
self.metadata.weak_tombstone_count
}

/// Returns the number of value entries reclaimable once weak tombstones can be GC'd.
#[must_use]
#[doc(hidden)]
pub fn weak_tombstone_reclaimable(&self) -> u64 {
self.metadata.weak_tombstone_reclaimable
}

/// Returns the ratio of tombstone markers in the `Segment`.
#[must_use]
#[doc(hidden)]
Expand Down
8 changes: 8 additions & 0 deletions src/segment/writer/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub struct Metadata {
/// Tombstone count
pub tombstone_count: usize,

/// Weak tombstone (single delete) count
pub weak_tombstone_count: usize,

/// Weak tombstone + value pairs that become reclaimable when GC watermark advances
pub weak_tombstone_reclaimable_count: usize,

// TODO: 3.0.0 - https://github.com/fjall-rs/lsm-tree/issues/101
/// Written key count (unique keys)
pub key_count: usize,
Expand Down Expand Up @@ -44,6 +50,8 @@ impl Default for Metadata {

item_count: 0,
tombstone_count: 0,
weak_tombstone_count: 0,
weak_tombstone_reclaimable_count: 0,
key_count: 0,
file_pos: BlockOffset(0),
uncompressed_size: 0,
Expand Down
47 changes: 38 additions & 9 deletions src/segment/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
};
use crate::{
coding::Encode, file::fsync_directory, segment::filter::standard_bloom::Builder,
time::unix_timestamp, CompressionType, InternalValue, SegmentId, UserKey,
time::unix_timestamp, CompressionType, InternalValue, SegmentId, UserKey, ValueType,
};
use index::{BlockIndexWriter, FullIndexWriter};
use std::{fs::File, io::BufWriter, path::PathBuf};
Expand Down Expand Up @@ -61,6 +61,9 @@ pub struct Writer {
///
/// using enhanced double hashing, so we got two u64s
pub bloom_hash_buffer: Vec<u64>,

/// Tracks the previously written item to detect weak tombstone/value pairs
previous_item: Option<(UserKey, ValueType)>,
}

impl Writer {
Expand Down Expand Up @@ -103,6 +106,8 @@ impl Writer {
bloom_policy: BloomConstructionPolicy::default(),

bloom_hash_buffer: Vec::new(),

previous_item: None,
})
}

Expand Down Expand Up @@ -171,33 +176,49 @@ impl Writer {
/// sorted as described by the [`UserKey`], otherwise the block layout will
/// be non-sense.
pub fn write(&mut self, item: InternalValue) -> crate::Result<()> {
let value_type = item.key.value_type;
let seqno = item.key.seqno;
let user_key = item.key.user_key.clone();
let value_len = item.value.len();

if item.is_tombstone() {
self.meta.tombstone_count += 1;
}

if value_type == ValueType::WeakTombstone {
self.meta.weak_tombstone_count += 1;
}

if value_type == ValueType::Value {
if let Some((prev_key, prev_type)) = &self.previous_item {
if prev_type == &ValueType::WeakTombstone && prev_key.as_ref() == user_key.as_ref()
{
self.meta.weak_tombstone_reclaimable_count += 1;
}
}
}

// NOTE: Check if we visit a new key
if Some(&item.key.user_key) != self.current_key.as_ref() {
if Some(&user_key) != self.current_key.as_ref() {
self.meta.key_count += 1;
self.current_key = Some(item.key.user_key.clone());
self.current_key = Some(user_key.clone());

// IMPORTANT: Do not buffer *every* item's key
// because there may be multiple versions
// of the same key

if self.bloom_policy.is_active() {
self.bloom_hash_buffer
.push(Builder::get_hash(&item.key.user_key));
self.bloom_hash_buffer.push(Builder::get_hash(&user_key));
}
}

let seqno = item.key.seqno;

if self.meta.first_key.is_none() {
self.meta.first_key = Some(item.key.user_key.clone());
self.meta.first_key = Some(user_key.clone());
}

self.chunk_size += item.key.user_key.len() + item.value.len();
self.chunk_size += user_key.len() + value_len;
self.chunk.push(item);
self.previous_item = Some((user_key, value_type));

if self.chunk_size >= self.data_block_size as usize {
self.spill_block()?;
Expand Down Expand Up @@ -403,6 +424,14 @@ impl Writer {
"#user_data_size",
&self.meta.uncompressed_size.to_le_bytes(),
),
meta(
"#weak_tombstone_count",
&(self.meta.weak_tombstone_count as u64).to_le_bytes(),
),
meta(
"#weak_tombstone_reclaimable",
&(self.meta.weak_tombstone_reclaimable_count as u64).to_le_bytes(),
),
meta("v#lsmt", env!("CARGO_PKG_VERSION").as_bytes()),
meta("v#table_version", &[3u8]),
// TODO: tli_handle_count
Expand Down
24 changes: 24 additions & 0 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,30 @@ impl AbstractTree for Tree {
.sum()
}

/// Returns the number of weak tombstones (single deletes) in the tree.
#[must_use]
fn weak_tombstone_count(&self) -> u64 {
self.manifest
.read()
.expect("lock is poisoned")
.current_version()
.iter_segments()
.map(Segment::weak_tombstone_count)
.sum()
}

/// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
#[must_use]
fn weak_tombstone_reclaimable_count(&self) -> u64 {
self.manifest
.read()
.expect("lock is poisoned")
.current_version()
.iter_segments()
.map(Segment::weak_tombstone_reclaimable)
.sum()
}

fn ingest(
&self,
iter: impl Iterator<Item = (UserKey, UserValue)>,
Expand Down
57 changes: 57 additions & 0 deletions tests/segment_weak_tombstones.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use lsm_tree::{AbstractTree, Config};

#[test]
fn weak_tombstone_counts_single_pair() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let tree = Config::new(folder.path()).open()?;

tree.insert(b"a", b"old", 1);
tree.remove_weak(b"a", 2);
tree.flush_active_memtable(0)?;

assert_eq!(tree.weak_tombstone_count(), 1);
assert_eq!(tree.weak_tombstone_reclaimable_count(), 1);

Ok(())
}

#[test]
fn weak_tombstone_counts_multiple_keys() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let tree = Config::new(folder.path()).open()?;

tree.insert(b"a", b"old", 10);
tree.remove_weak(b"a", 11);

tree.remove_weak(b"b", 12);

tree.insert(b"c", b"old", 13);
tree.insert(b"c", b"new", 14);
tree.remove_weak(b"c", 15);

tree.flush_active_memtable(0)?;

assert_eq!(tree.weak_tombstone_count(), 3);
assert_eq!(tree.weak_tombstone_reclaimable_count(), 2);

Ok(())
}

#[test]
fn weak_tombstone_counts_multiple_weak() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let tree = Config::new(folder.path()).open()?;

tree.insert(b"a", b"old", 10);
tree.remove_weak(b"a", 11);
tree.remove_weak(b"a", 12);
tree.remove_weak(b"a", 13);
tree.remove_weak(b"a", 14);

tree.flush_active_memtable(0)?;

assert_eq!(tree.weak_tombstone_count(), 4);
assert_eq!(tree.weak_tombstone_reclaimable_count(), 1); // a:10 is paired with a:11

Ok(())
}