Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ path = "src/lib.rs"
[features]
default = []
lz4 = ["dep:lz4_flex"]
zlib =["dep:flate2"]
bytes_1 = ["dep:bytes"]
metrics = []
use_unsafe = [] # TODO: 3.0.0 remove
Expand All @@ -32,6 +33,7 @@ enum_dispatch = "0.3.13"
interval-heap = "0.0.5"
log = "0.4.27"
lz4_flex = { version = "0.11.5", optional = true, default-features = false }
flate2 = { version = "1.0.17", optional = true, default-features = false, features = ["zlib-rs"] }
quick_cache = { version = "0.6.16", default-features = false, features = [] }
rustc-hash = "2.1.1"
self_cell = "1.2.0"
Expand Down
16 changes: 10 additions & 6 deletions src/coding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use std::io::{Read, Write};
pub enum EncodeError {
/// I/O error
Io(std::io::Error),
InvalidCompressionLevel((crate::CompressionType, i32)),
}

impl std::fmt::Display for EncodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EncodeError({})",
match self {
Self::Io(e) => e.to_string(),
match self {
Self::Io(e) => write!(f, "EncodeError(Io: {})", e),
Self::InvalidCompressionLevel((compression_type, level)) => {
write!(f, "EncodeError(InvalidCompressionLevel: {compression_type:?} level {level})")
}
)
}
}
}

Expand All @@ -33,6 +33,7 @@ impl std::error::Error for EncodeError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(e) => Some(e),
Self::InvalidCompressionLevel(_) => None,
}
}
}
Expand All @@ -57,6 +58,8 @@ pub enum DecodeError {

/// UTF-8 error
Utf8(std::str::Utf8Error),

InvalidCompressionLevel((crate::CompressionType, i32)),
}

impl std::fmt::Display for DecodeError {
Expand All @@ -66,6 +69,7 @@ impl std::fmt::Display for DecodeError {
"DecodeError({})",
match self {
Self::Io(e) => e.to_string(),
Self::InvalidCompressionLevel((compression_type, level)) => format!("InvalidCompressionLevel: {compression_type:?} level {level}"),
e => format!("{e:?}"),
}
)
Expand Down
49 changes: 49 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use byteorder::{ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};

#[cfg(feature = "zlib")]
const ZLIB_MAX_LEVEL: u8 = 9;

/// Compression algorithm to use
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[allow(clippy::module_name_repetitions)]
Expand All @@ -21,6 +24,10 @@ pub enum CompressionType {
/// on speed over compression ratio.
#[cfg(feature = "lz4")]
Lz4,

/// Zlib compression
#[cfg(feature = "zlib")]
Zlib(u8),
}

impl Encode for CompressionType {
Expand All @@ -34,6 +41,15 @@ impl Encode for CompressionType {
Self::Lz4 => {
writer.write_u8(1)?;
}

#[cfg(feature = "zlib")]
Self::Zlib(level) => {
if *level > ZLIB_MAX_LEVEL {
return Err(EncodeError::InvalidCompressionLevel((Self::Zlib(*level), *level as i32)));
}
writer.write_u8(2)?;
writer.write_u8(*level)?;
}
}

Ok(())
Expand All @@ -50,6 +66,14 @@ impl Decode for CompressionType {
#[cfg(feature = "lz4")]
1 => Ok(Self::Lz4),

#[cfg(feature = "zlib")]
2 => {
let level = reader.read_u8()?;
if level > ZLIB_MAX_LEVEL {
return Err(DecodeError::InvalidCompressionLevel((Self::Zlib(level), level as i32)));
}
Ok(Self::Zlib(level))
}
tag => Err(DecodeError::InvalidTag(("CompressionType", tag))),
}
}
Expand All @@ -65,6 +89,11 @@ impl std::fmt::Display for CompressionType {

#[cfg(feature = "lz4")]
Self::Lz4 => "lz4",

#[cfg(feature = "zlib")]
Self::Zlib(level) => {
return write!(f, "zlib (level {})", level);
}
}
)
}
Expand Down Expand Up @@ -92,4 +121,24 @@ mod tests {
assert_eq!(1, serialized.len());
}
}

#[cfg(feature = "zlib")]
mod zlib {
use super::*;
use test_log::test;

#[test]
fn compression_serialize_zlib() {
for level in 0..=ZLIB_MAX_LEVEL {
let serialized = CompressionType::Zlib(level).encode_into_vec();
assert_eq!(2, serialized.len());
}
}

#[test]
fn compression_serialize_zlib_invalid_level() {
let err = CompressionType::Zlib(ZLIB_MAX_LEVEL + 1).encode_into_vec_err();
assert!(matches!(err, Err(EncodeError::InvalidCompressionLevel((CompressionType::Zlib(10), 10)))));
}
}
}
43 changes: 39 additions & 4 deletions src/segment/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ use crate::{
segment::BlockHandle,
CompressionType, Slice,
};
use std::borrow::Cow;
use std::fs::File;
use std::io::{Read, Write};

#[cfg(feature = "zlib")]
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression as ZCompression};

/// A block on disk
///
Expand Down Expand Up @@ -56,16 +61,24 @@ impl Block {
previous_block_offset: BlockOffset(0), // <-- TODO:
};

let data = match compression {
CompressionType::None => data,
let data: Cow<[u8]> = match compression {
CompressionType::None => Cow::Borrowed(data),

#[cfg(feature = "lz4")]
CompressionType::Lz4 => &lz4_flex::compress(data),
CompressionType::Lz4 => Cow::Owned((lz4_flex::compress(data))),

#[cfg(feature = "zlib")]
CompressionType::Zlib(level) => {
let lvl = level as u32;
let mut e = ZlibEncoder::new(Vec::new(), ZCompression::new(lvl));
e.write_all(data)?;
Cow::Owned(e.finish()?)
}
};
header.data_length = data.len() as u32;

header.encode_into(&mut writer)?;
writer.write_all(data)?;
writer.write_all(&data)?;

log::trace!(
"Writing block with size {}B (compressed: {}B) (excluding header of {}B)",
Expand Down Expand Up @@ -100,6 +113,16 @@ impl Block {

builder.freeze().into()
}

#[cfg(feature = "zlib")]
CompressionType::Zlib(_level) => {
let mut d = ZlibDecoder::new(&raw_data[..]);
let mut decompressed_data =
unsafe { Slice::builder_unzeroed(header.uncompressed_length as usize) };
d.read_exact(&mut decompressed_data)
.map_err(|_| crate::Error::Decompress(compression))?;
decompressed_data.freeze().into()
}
};

debug_assert_eq!(header.uncompressed_length, {
Expand Down Expand Up @@ -191,6 +214,18 @@ impl Block {

builder.freeze()
}

#[cfg(feature = "zlib")]
CompressionType::Zlib(_level) => {
#[allow(clippy::indexing_slicing)]
let raw_data = &buf[Header::serialized_len()..];
let mut d = ZlibDecoder::new(raw_data);
let mut decompressed_data =
unsafe { Slice::builder_unzeroed(header.uncompressed_length as usize) };
d.read_exact(&mut decompressed_data)
.map_err(|_| crate::Error::Decompress(compression))?;
decompressed_data.freeze()
}
};

#[allow(clippy::expect_used, clippy::cast_possible_truncation)]
Expand Down
Loading