Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ pub trait AbstractTree {
/// Returns the approximate number of tombstones in the tree.
fn tombstone_count(&self) -> u64;

/// Returns the approximate number of weak tombstones (single deletes) in the tree.
fn weak_tombstone_count(&self) -> u64;

/// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
fn weak_tombstone_reclaimable_count(&self) -> u64;

// TODO: clear() with Nuke compaction strategy (write lock) -> drop_range(..)

/// Drops segments that are fully contained in a given range.
Expand Down
8 changes: 8 additions & 0 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ impl AbstractTree for BlobTree {
self.index.tombstone_count()
}

fn weak_tombstone_count(&self) -> u64 {
self.index.weak_tombstone_count()
}

fn weak_tombstone_reclaimable_count(&self) -> u64 {
self.index.weak_tombstone_reclaimable_count()
}

fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
self.index.drop_range(range)
}
Expand Down
4 changes: 4 additions & 0 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl CompactionStrategy for Strategy {
let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];

{
// TODO(weak-tombstone-rewrite): incorporate `Segment::weak_tombstone_count` and
// `Segment::weak_tombstone_reclaimable` when computing level scores so rewrite
// decisions can prioritize segments that would free the most reclaimable values.

// Score first level

// NOTE: We always have at least one level
Expand Down
6 changes: 6 additions & 0 deletions src/segment/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct ParsedMeta {
pub file_size: u64,
pub item_count: u64,
pub tombstone_count: u64,
pub weak_tombstone_count: u64,
pub weak_tombstone_reclaimable: u64,

pub data_block_compression: CompressionType,
pub index_block_compression: CompressionType,
Expand Down Expand Up @@ -142,6 +144,8 @@ impl ParsedMeta {
let data_block_count = read_u64!(block, b"#data_block_count");
let index_block_count = read_u64!(block, b"#index_block_count");
let file_size = read_u64!(block, b"#size"); // TODO: 3.0.0 rename file_size
let weak_tombstone_count = read_u64!(block, b"#weak_tombstone_count");
let weak_tombstone_reclaimable = read_u64!(block, b"#weak_tombstone_reclaimable");

let created_at = {
let bytes = block
Expand Down Expand Up @@ -213,6 +217,8 @@ impl ParsedMeta {
file_size,
item_count,
tombstone_count,
weak_tombstone_count,
weak_tombstone_reclaimable,
data_block_compression,
index_block_compression,
})
Expand Down
14 changes: 14 additions & 0 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,20 @@ impl Segment {
self.metadata.tombstone_count
}

/// Returns the number of weak (single delete) tombstones in the `Segment`.
#[must_use]
#[doc(hidden)]
pub fn weak_tombstone_count(&self) -> u64 {
self.metadata.weak_tombstone_count
}

/// Returns the number of value entries reclaimable once weak tombstones can be GC'd.
#[must_use]
#[doc(hidden)]
pub fn weak_tombstone_reclaimable(&self) -> u64 {
self.metadata.weak_tombstone_reclaimable
}

/// Returns the ratio of tombstone markers in the `Segment`.
#[must_use]
#[doc(hidden)]
Expand Down
8 changes: 8 additions & 0 deletions src/segment/writer/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub struct Metadata {
/// Tombstone count
pub tombstone_count: usize,

/// Weak tombstone (single delete) count
pub weak_tombstone_count: usize,

/// Weak tombstone + value pairs that become reclaimable when GC watermark advances
pub weak_tombstone_reclaimable_count: usize,

// TODO: 3.0.0 - https://github.com/fjall-rs/lsm-tree/issues/101
/// Written key count (unique keys)
pub key_count: usize,
Expand Down Expand Up @@ -44,6 +50,8 @@ impl Default for Metadata {

item_count: 0,
tombstone_count: 0,
weak_tombstone_count: 0,
weak_tombstone_reclaimable_count: 0,
key_count: 0,
file_pos: BlockOffset(0),
uncompressed_size: 0,
Expand Down
46 changes: 38 additions & 8 deletions src/segment/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{
use crate::{
coding::Encode, file::fsync_directory, segment::filter::standard_bloom::Builder,
time::unix_timestamp, vlog::BlobFileId, CompressionType, InternalValue, SegmentId, UserKey,
ValueType,
};
use index::{BlockIndexWriter, FullIndexWriter};
use std::{fs::File, io::BufWriter, path::PathBuf};
Expand Down Expand Up @@ -69,6 +70,9 @@ pub struct Writer {
/// using enhanced double hashing, so we got two u64s
pub bloom_hash_buffer: Vec<u64>,

/// Tracks the previously written item to detect weak tombstone/value pairs
previous_item: Option<(UserKey, ValueType)>,

linked_blob_files: Vec<LinkedFile>,
}

Expand Down Expand Up @@ -113,6 +117,8 @@ impl Writer {

bloom_hash_buffer: Vec::new(),

previous_item: None,

linked_blob_files: Vec::new(),
})
}
Expand Down Expand Up @@ -190,33 +196,49 @@ impl Writer {
/// sorted as described by the [`UserKey`], otherwise the block layout will
/// be non-sense.
pub fn write(&mut self, item: InternalValue) -> crate::Result<()> {
let value_type = item.key.value_type;
let seqno = item.key.seqno;
let user_key = item.key.user_key.clone();
let value_len = item.value.len();

if item.is_tombstone() {
self.meta.tombstone_count += 1;
}

if value_type == ValueType::WeakTombstone {
self.meta.weak_tombstone_count += 1;
}

if value_type == ValueType::Value {
if let Some((prev_key, prev_type)) = &self.previous_item {
if prev_type == &ValueType::WeakTombstone && prev_key.as_ref() == user_key.as_ref()
{
self.meta.weak_tombstone_reclaimable_count += 1;
}
}
}

// NOTE: Check if we visit a new key
if Some(&item.key.user_key) != self.current_key.as_ref() {
if Some(&user_key) != self.current_key.as_ref() {
self.meta.key_count += 1;
self.current_key = Some(item.key.user_key.clone());
self.current_key = Some(user_key.clone());

// IMPORTANT: Do not buffer *every* item's key
// because there may be multiple versions
// of the same key

if self.bloom_policy.is_active() {
self.bloom_hash_buffer
.push(Builder::get_hash(&item.key.user_key));
self.bloom_hash_buffer.push(Builder::get_hash(&user_key));
}
}

let seqno = item.key.seqno;

if self.meta.first_key.is_none() {
self.meta.first_key = Some(item.key.user_key.clone());
self.meta.first_key = Some(user_key.clone());
}

self.chunk_size += item.key.user_key.len() + item.value.len();
self.chunk_size += user_key.len() + value_len;
self.chunk.push(item);
self.previous_item = Some((user_key, value_type));

if self.chunk_size >= self.data_block_size as usize {
self.spill_block()?;
Expand Down Expand Up @@ -431,6 +453,14 @@ impl Writer {
"#user_data_size",
&self.meta.uncompressed_size.to_le_bytes(),
),
meta(
"#weak_tombstone_count",
&(self.meta.weak_tombstone_count as u64).to_le_bytes(),
),
meta(
"#weak_tombstone_reclaimable",
&(self.meta.weak_tombstone_reclaimable_count as u64).to_le_bytes(),
),
meta("v#lsmt", env!("CARGO_PKG_VERSION").as_bytes()),
meta("v#table_version", &[3u8]),
// TODO: tli_handle_count
Expand Down
18 changes: 17 additions & 1 deletion src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,30 @@ impl AbstractTree for Tree {
)
}

// TODO: doctest
/// Returns the number of tombstones in the tree.
fn tombstone_count(&self) -> u64 {
self.current_version()
.iter_segments()
.map(Segment::tombstone_count)
.sum()
}

/// Returns the number of weak tombstones (single deletes) in the tree.
fn weak_tombstone_count(&self) -> u64 {
self.current_version()
.iter_segments()
.map(Segment::weak_tombstone_count)
.sum()
}

/// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
fn weak_tombstone_reclaimable_count(&self) -> u64 {
self.current_version()
.iter_segments()
.map(Segment::weak_tombstone_reclaimable)
.sum()
}

fn ingest(
&self,
iter: impl Iterator<Item = (UserKey, UserValue)>,
Expand Down
57 changes: 57 additions & 0 deletions tests/segment_weak_tombstones.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use lsm_tree::{AbstractTree, Config};

#[test]
fn weak_tombstone_counts_single_pair() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let tree = Config::new(folder.path()).open()?;

tree.insert(b"a", b"old", 1);
tree.remove_weak(b"a", 2);
tree.flush_active_memtable(0)?;

assert_eq!(tree.weak_tombstone_count(), 1);
assert_eq!(tree.weak_tombstone_reclaimable_count(), 1);

Ok(())
}

#[test]
fn weak_tombstone_counts_multiple_keys() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let tree = Config::new(folder.path()).open()?;

tree.insert(b"a", b"old", 10);
tree.remove_weak(b"a", 11);

tree.remove_weak(b"b", 12);

tree.insert(b"c", b"old", 13);
tree.insert(b"c", b"new", 14);
tree.remove_weak(b"c", 15);

tree.flush_active_memtable(0)?;

assert_eq!(tree.weak_tombstone_count(), 3);
assert_eq!(tree.weak_tombstone_reclaimable_count(), 2);

Ok(())
}

#[test]
fn weak_tombstone_counts_multiple_weak() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;
let tree = Config::new(folder.path()).open()?;

tree.insert(b"a", b"old", 10);
tree.remove_weak(b"a", 11);
tree.remove_weak(b"a", 12);
tree.remove_weak(b"a", 13);
tree.remove_weak(b"a", 14);

tree.flush_active_memtable(0)?;

assert_eq!(tree.weak_tombstone_count(), 4);
assert_eq!(tree.weak_tombstone_reclaimable_count(), 1); // a:10 is paired with a:11

Ok(())
}