Skip to content

Commit

Permalink
added the fsst encoding step, code is compiling successfully
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 8, 2025
1 parent 468db86 commit 96297d7
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 48 deletions.
5 changes: 3 additions & 2 deletions src/include/duckdb/storage/compression/dict_fsst/analyze.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ struct DictFSSTAnalyzeState : public DictFSSTCompressionState {
void AddNewString(string_t str) override;
void AddLookup(uint32_t lookup_result) override;
void AddNull() override;
bool HasRoomForString(bool new_string, idx_t string_size) override;
idx_t RequiredSpace(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void ProcessStrings(UnifiedVectorFormat &input, idx_t count) override;
void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override;
bool EncodeDictionary() override;
const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
void Verify() override;

Expand Down
20 changes: 15 additions & 5 deletions src/include/duckdb/storage/compression/dict_fsst/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ typedef struct {
bool fsst_encoded;
} dict_fsst_compression_header_t;

enum class DictionaryAppendState : uint8_t {
REGULAR, //! Symbol table threshold not reached yet
ENCODED, //! Reached the threshold, decided to encode the dictionary
NOT_ENCODED //! Reached the threshold, decided not to encode the dictionary
};

struct DictFSSTCompression {
public:
static constexpr float MINIMUM_COMPRESSION_RATIO = 1.2F;
Expand Down Expand Up @@ -54,18 +60,22 @@ class DictFSSTCompressionState : public CompressionState {
virtual void AddNewString(string_t str) = 0;
// Add a null value to the compression state
virtual void AddNull() = 0;
// Needs to be called before adding a value. Will return false if a flush is required first.
virtual bool HasRoomForString(bool new_string, idx_t string_size) = 0;
virtual idx_t RequiredSpace(bool new_string, idx_t string_size) = 0;
// Flush the segment to disk if compressing or reset the counters if analyzing
virtual void Flush(bool final = false) = 0;
// Process the strings of the vector if necessary
virtual void ProcessStrings(UnifiedVectorFormat &input, idx_t count) = 0;
virtual void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) = 0;
// Encode the dictionary with FSST, return false if we decided not to encode
virtual bool EncodeDictionary() = 0;
// Retrieve the string given the indices
virtual const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) = 0;

private:
bool DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, idx_t index, idx_t raw_index);

protected:
//! Whether the dictionary has been encoded with FSST
bool fsst_encoded = false;
//! Keep track of the append state for the current segment
DictionaryAppendState append_state = DictionaryAppendState::REGULAR;
};

} // namespace dict_fsst
Expand Down
27 changes: 25 additions & 2 deletions src/include/duckdb/storage/compression/dict_fsst/compression.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,29 @@ namespace dict_fsst {
// scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it
// allows for efficient bitpacking compression as the selection values should remain relatively small.

struct EncodedInputData {
public:
explicit EncodedInputData(Allocator &allocator) : heap(allocator) {
}

public:
void Reset() {
input_data.clear();
heap.Destroy();
}

public:
StringHeap heap;
vector<string_t> input_data;
};

//===--------------------------------------------------------------------===//
// Compress
//===--------------------------------------------------------------------===//
struct DictFSSTCompressionCompressState : public DictFSSTCompressionState {
public:
static constexpr idx_t DICTIONARY_ENCODE_THRESHOLD = 4096;

public:
DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info);

Expand All @@ -33,9 +52,10 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState {
void AddNewString(string_t str) override;
void AddNull() override;
void AddLookup(uint32_t lookup_result) override;
bool HasRoomForString(bool new_string, idx_t string_size) override;
idx_t RequiredSpace(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void ProcessStrings(UnifiedVectorFormat &input, idx_t count) override;
void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override;
bool EncodeDictionary() override;
const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
idx_t Finalize();

Expand All @@ -56,6 +76,9 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState {

bitpacking_width_t current_width = 0;
bitpacking_width_t next_width = 0;

EncodedInputData encoded_input;
void *encoder;
};

} // namespace dict_fsst
Expand Down
20 changes: 12 additions & 8 deletions src/storage/compression/dict_fsst/analyze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,26 @@ void DictFSSTAnalyzeState::AddNull() {
current_tuple_count++;
}

void DictFSSTAnalyzeState::ProcessStrings(UnifiedVectorFormat &input, idx_t count) {
return;
void DictFSSTAnalyzeState::EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) {
throw InternalException("We should never encode during analyze step");
}

bool DictFSSTAnalyzeState::EncodeDictionary() {
return false;
}

const string_t &DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) {
return strings[index];
}

bool DictFSSTAnalyzeState::HasRoomForString(bool new_string, idx_t string_size) {
idx_t DictFSSTAnalyzeState::RequiredSpace(bool new_string, idx_t string_size) {
if (!new_string) {
return DictFSSTCompression::HasEnoughSpace(current_tuple_count + 1, current_unique_count, current_dict_size,
current_width, info.GetBlockSize());
return DictFSSTCompression::RequiredSpace(current_tuple_count + 1, current_unique_count, current_dict_size,
current_width);
}
next_width = BitpackingPrimitives::MinimumBitWidth(current_unique_count + 2); // 1 for null, one for new string
return DictFSSTCompression::HasEnoughSpace(current_tuple_count + 1, current_unique_count + 1,
current_dict_size + string_size, next_width, info.GetBlockSize());
auto next_width = BitpackingPrimitives::MinimumBitWidth(current_unique_count + 2); // 1 for null, one for new string
return DictFSSTCompression::RequiredSpace(current_tuple_count + 1, current_unique_count + 1,
current_dict_size + string_size, next_width);
}

void DictFSSTAnalyzeState::Flush(bool final) {
Expand Down
54 changes: 47 additions & 7 deletions src/storage/compression/dict_fsst/common.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include "duckdb/storage/compression/dict_fsst/common.hpp"
#include "fsst.h"

static constexpr uint16_t FSST_SYMBOL_TABLE_SIZE = sizeof(duckdb_fsst_decoder_t);

namespace duckdb {
namespace dict_fsst {
Expand Down Expand Up @@ -42,23 +45,61 @@ DictFSSTCompressionState::DictFSSTCompressionState(const CompressionInfo &info)
DictFSSTCompressionState::~DictFSSTCompressionState() {
}

bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count,
idx_t index, idx_t raw_index) {
auto strings = vdata.GetData<string_t>(vdata);
reference<const string_t> str = GetString(strings, index, raw_index);
auto required_space = RequiredSpace(is_new, str.get().GetSize());

auto block_size = info.GetBlockSize();

switch (append_state) {
case DictionaryAppendState::REGULAR: {
auto symbol_table_threshold = block_size - FSST_SYMBOL_TABLE_SIZE;
if (required_space > symbol_table_threshold) {
//! Decide whether or not to encode the dictionary
if (EncodeDictionary()) {
EncodeInputStrings(vdata, count);
str = GetString(strings, index, raw_index);
required_space = RequiredSpace(is_new, str.get().GetSize());
if (required_space > block_size) {
//! Even after encoding the dictionary, we can't add this string
return false;
}
}
}
return true;
}
case DictionaryAppendState::ENCODED: {
return required_space <= (block_size - FSST_SYMBOL_TABLE_SIZE);
}
case DictionaryAppendState::NOT_ENCODED: {
return required_space <= block_size;
}
};
}

bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) {
UnifiedVectorFormat vdata;
scan_vector.ToUnifiedFormat(count, vdata);
auto data = UnifiedVectorFormat::GetData<string_t>(vdata);
Verify();

ProcessStrings(vdata, count);
if (append_state == DictionaryAppendState::ENCODED) {
// The dictionary has been encoded
// to look up a string in the dictionary, the input needs to be encoded as well
EncodeInputStrings(vdata, count);
}

for (idx_t i = 0; i < count; i++) {
auto idx = vdata.sel->get_index(i);
idx_t string_size = 0;
optional_idx lookup_result;
auto row_is_valid = vdata.validity.RowIsValid(idx);

auto &str = GetString(data, idx, i);
reference<const string_t> str = GetString(data, idx, i);
if (row_is_valid) {
string_size = str.GetSize();
string_size = str.get().GetSize();
if (string_size >= StringUncompressed::GetStringBlockLimit(info.GetBlockSize())) {
// Big strings not implemented for dictionary compression
return false;
Expand All @@ -67,13 +108,12 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) {
}

bool new_string = !lookup_result.IsValid();
bool fits = HasRoomForString(new_string, string_size);
bool fits = DryAppendToCurrentSegment(new_string, vdata, count, idx, i);

if (!fits) {
// TODO: Check if the dictionary requires FSST encoding
Flush();
lookup_result = optional_idx();

fits = HasRoomForString(true, string_size);
fits = DryAppendToCurrentSegment(true, vdata, count, idx, i);
if (!fits) {
throw InternalException("Dictionary compression could not write to new segment");
}
Expand Down
Loading

0 comments on commit 96297d7

Please sign in to comment.