Skip to content

Commit

Permalink
feat: fsst compression with mini-block (#3121)
Browse files Browse the repository at this point in the history
This PR tries to integrate mini-block page layout with FSST compression.

During compression, it first FSST compresses the input data then write
out the data use `BinaryMiniBlockEncoder`.
During decompression, it first uses `BinaryMiniBlockDecompressor` to
decode the raw data read from disk, it then applies `FSST
decompression`.
  • Loading branch information
broccoliSpicy authored Nov 13, 2024
1 parent ec76db4 commit 3ac6d4a
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 12 deletions.
6 changes: 6 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ message Binary {
message BinaryMiniBlock {
}

message FsstMiniBlock {
ArrayEncoding BinaryMiniBlock = 1;
bytes symbol_table = 2;
}

message Fsst {
ArrayEncoding binary = 1;
bytes symbol_table = 2;
Expand Down Expand Up @@ -273,6 +278,7 @@ message ArrayEncoding {
Constant constant = 13;
Bitpack2 bitpack2 = 14;
BinaryMiniBlock binary_mini_block = 15;
FsstMiniBlock fsst_mini_block = 16;
}
}

Expand Down
7 changes: 5 additions & 2 deletions rust/lance-encoding/src/compression_algo/fsst/src/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ const FSST_CODE_MASK: u16 = FSST_CODE_MAX - 1;
const FSST_SAMPLETARGET: usize = 1 << 14;
const FSST_SAMPLEMAXSZ: usize = 2 * FSST_SAMPLETARGET;

// the the input size is less than 4MB, we mark the file header and copy the input to the output as is
const FSST_LEAST_INPUT_SIZE: usize = 4 * 1024 * 1024; // 4MB
// if the input size is less than 4MB, we mark the file header and copy the input to the output as is
pub const FSST_LEAST_INPUT_SIZE: usize = 4 * 1024 * 1024; // 4MB

// if the max length of the input strings are less than `FSST_LEAST_INPUT_MAX_LENGTH`, we shouldn't use FSST.
pub const FSST_LEAST_INPUT_MAX_LENGTH: u64 = 5;

// we only use the lower 32 bits in icl, so we can use 1 << 32 to represent a free slot in the hash table
const FSST_ICL_FREE: u64 = 1 << 32;
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ use crate::encodings::logical::r#struct::{
use crate::encodings::physical::binary::BinaryMiniBlockDecompressor;
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -505,6 +506,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => {
Ok(Box::new(BinaryMiniBlockDecompressor::default()))
}
pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => {
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
}
_ => todo!(),
}
}
Expand Down
20 changes: 18 additions & 2 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::encodings::physical::bitpack_fastlanes::{
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fixed_size_list::FslPerValueCompressor;
use crate::encodings::physical::fsst::FsstArrayEncoder;
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::format::ProtobufUtils;
use crate::repdef::RepDefBuilder;
Expand All @@ -49,6 +49,7 @@ use crate::{
},
format::pb,
};
use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};

use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use std::collections::hash_map::RandomState;
Expand Down Expand Up @@ -792,7 +793,7 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
if let DataBlock::FixedWidth(ref fixed_width_data) = data {
let bit_widths = data
.get_stat(Stat::BitWidth)
.expect("FixedWidthDataBlock should have valid bit width statistics");
.expect("FixedWidthDataBlock should have valid `Stat::BitWidth` statistics");
// Temporary hack to work around https://github.com/lancedb/lance/issues/3102
// Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
let has_all_zeros = bit_widths
Expand All @@ -811,6 +812,21 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
}
if let DataBlock::VariableWidth(ref variable_width_data) = data {
if variable_width_data.bits_per_offset == 32 {
let data_size = variable_width_data.get_stat(Stat::DataSize).expect(
"VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
);
let data_size = data_size.as_primitive::<UInt64Type>().value(0);

let max_len = variable_width_data.get_stat(Stat::MaxLength).expect(
"VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
);
let max_len = max_len.as_primitive::<UInt64Type>().value(0);

if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
&& data_size >= FSST_LEAST_INPUT_SIZE as u64
{
return Ok(Box::new(FsstMiniBlockEncoder::default()));
}
return Ok(Box::new(BinaryMiniBlockEncoder::default()));
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ pub fn decoder_from_array_encoding(
pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::FsstMiniBlock(_) => unreachable!(),
}
}

Expand Down
154 changes: 148 additions & 6 deletions rust/lance-encoding/src/encodings/physical/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ use arrow_buffer::ScalarBuffer;
use arrow_schema::DataType;
use futures::{future::BoxFuture, FutureExt};

use lance_core::Result;
use lance_core::{Error, Result};
use snafu::{location, Location};

use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlock, NullableDataBlock, UsedEncoding, VariableWidthBlock},
decoder::{PageScheduler, PrimitivePageDecoder},
encoder::{ArrayEncoder, EncodedArray},
format::ProtobufUtils,
decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder},
encoder::{ArrayEncoder, EncodedArray, MiniBlockCompressed, MiniBlockCompressor},
format::{
pb::{self},
ProtobufUtils,
},
EncodingsIo,
};

use super::binary::{BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder};

#[derive(Debug)]
pub struct FsstPageScheduler {
inner_scheduler: Box<dyn PageScheduler>,
Expand Down Expand Up @@ -201,14 +207,132 @@ impl ArrayEncoder for FsstArrayEncoder {
}
}

#[derive(Debug, Default)]
pub struct FsstMiniBlockEncoder {}

impl MiniBlockCompressor for FsstMiniBlockEncoder {
fn compress(
&self,
data: DataBlock,
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
match data {
DataBlock::VariableWidth(mut variable_width) => {
let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
let offsets_slice = offsets.as_ref();
let bytes_data = variable_width.data.into_buffer();

// prepare compression output buffer
let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
let mut dest_values = vec![0_u8; bytes_data.len() * 2];
let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];

// fsst compression
fsst::fsst::compress(
&mut symbol_table,
bytes_data.as_slice(),
offsets_slice,
&mut dest_values,
&mut dest_offsets,
)?;

// construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
let data_block = DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::reinterpret_vec(dest_values),
bits_per_offset: 32,
offsets: LanceBuffer::reinterpret_vec(dest_offsets),
num_values: variable_width.num_values,
block_info: BlockInfo::new(),
used_encodings: UsedEncoding::new(),
});

// compress the fsst compressed data using `BinaryMiniBlockEncoder`
let binary_compressor =
Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;

let (binary_miniblock_compressed, binary_array_encoding) =
binary_compressor.compress(data_block)?;

Ok((
binary_miniblock_compressed,
ProtobufUtils::fsst_mini_block(binary_array_encoding, symbol_table),
))
}
_ => Err(Error::InvalidInput {
source: format!(
"Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
data.name()
)
.into(),
location: location!(),
}),
}
}
}

#[derive(Debug)]
pub struct FsstMiniBlockDecompressor {
symbol_table: Vec<u8>,
}

impl FsstMiniBlockDecompressor {
pub fn new(description: &pb::FsstMiniBlock) -> Self {
Self {
symbol_table: description.symbol_table.clone(),
}
}
}

impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
// Step 1. decompress data use `BinaryMiniBlockDecompressor`
let binary_decompressor =
Box::new(BinaryMiniBlockDecompressor::default()) as Box<dyn MiniBlockDecompressor>;
let compressed_data_block = binary_decompressor.decompress(data, num_values)?;
let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
};

// Step 2. FSST decompress
let bytes = compressed_data_block.data.borrow_to_typed_slice::<u8>();
let bytes = bytes.as_ref();
let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
let offsets = offsets.as_ref();

// FSST decompression output buffer, the `MiniBlock` has a size limit of `4 KiB` and
// the FSST decompression algorithm output is at most `8 * input_size`
// Since `MiniBlock Size` <= 4 KiB and `offsets` are type `i32, it has number of `offsets` <= 1024.
let mut decompress_bytes_buf = vec![0u8; 4 * 1024 * 8];
let mut decompress_offset_buf = vec![0i32; 1024];
fsst::fsst::decompress(
&self.symbol_table,
bytes,
offsets,
&mut decompress_bytes_buf,
&mut decompress_offset_buf,
)?;

Ok(DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::Owned(decompress_bytes_buf),
offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
bits_per_offset: 32,
num_values,
block_info: BlockInfo::new(),
used_encodings: UsedEncoding::new(),
}))
}
}

#[cfg(test)]
mod tests {

use std::collections::HashMap;

use lance_datagen::{ByteCount, RowCount};

use crate::testing::{check_round_trip_encoding_of_data, TestCases};
use crate::{
testing::{check_round_trip_encoding_of_data, TestCases},
version::LanceFileVersion,
};

#[test_log::test(tokio::test)]
async fn test_fsst() {
Expand All @@ -218,6 +342,24 @@ mod tests {
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(vec![arr], &TestCases::default(), HashMap::new()).await;
check_round_trip_encoding_of_data(
vec![arr],
&TestCases::default().with_file_version(LanceFileVersion::V2_1),
HashMap::new(),
)
.await;

let arr = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
.into_batch_rows(RowCount::from(1_000_000))
.unwrap()
.column(0)
.clone();
check_round_trip_encoding_of_data(
vec![arr],
&TestCases::default().with_file_version(LanceFileVersion::V2_1),
HashMap::new(),
)
.await;
}
}
16 changes: 14 additions & 2 deletions rust/lance-encoding/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use pb::{
nullable::{AllNull, NoNull, Nullability, SomeNull},
page_layout::Layout,
AllNullLayout, ArrayEncoding, Binary, BinaryMiniBlock, Bitpack2, Bitpacked, BitpackedForNonNeg,
Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable,
PackedStruct, PageLayout,
Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock, MiniBlockLayout,
Nullable, PackedStruct, PageLayout,
};

use crate::encodings::physical::block_compress::CompressionConfig;
Expand Down Expand Up @@ -139,6 +139,18 @@ impl ProtobufUtils {
}
}

// Construct a `FsstMiniBlock` ArrayEncoding, the inner `binary_mini_block` encoding is actually
// not used and `FsstMiniBlockDecompressor` constructs a `binary_mini_block` in a `hard-coded` fashion.
// This can be an optimization later.
pub fn fsst_mini_block(data: ArrayEncoding, symbol_table: Vec<u8>) -> ArrayEncoding {
ArrayEncoding {
array_encoding: Some(ArrayEncodingEnum::FsstMiniBlock(Box::new(FsstMiniBlock {
binary_mini_block: Some(Box::new(data)),
symbol_table,
}))),
}
}

pub fn packed_struct(
child_encodings: Vec<ArrayEncoding>,
packed_buffer_index: u32,
Expand Down

0 comments on commit 3ac6d4a

Please sign in to comment.