Skip to content

Commit

Permalink
slowly getting there, FSST_ONLY is currently still broken
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 17, 2025
1 parent 246d33e commit 70d8115
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 34 deletions.
4 changes: 0 additions & 4 deletions src/include/duckdb/storage/compression/dict_fsst/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ struct DictFSSTCompression {
//! Dictionary header size at the beginning of the string segment (offset + length)
static constexpr uint16_t DICTIONARY_HEADER_SIZE = sizeof(dict_fsst_compression_header_t);
static constexpr idx_t STRING_SIZE_LIMIT = 16384;

public:
static StringDictionaryContainer GetDictionary(ColumnSegment &segment, BufferHandle &handle);
static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container);
};

} // namespace dict_fsst
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ struct DictFSSTCompressionState : public CompressionState {
idx_t tuple_count = 0;
unique_ptr<DictFSSTAnalyzeState> analyze;
bool all_unique = true;
idx_t symbol_table_size = DConstants::INVALID_INDEX;

private:
void *encoder = nullptr;
unsafe_unique_array<unsigned char> fsst_serialized_symbol_table;
idx_t symbol_table_size = DConstants::INVALID_INDEX;
DictionaryAppendState append_state = DictionaryAppendState::REGULAR;
};

Expand Down
61 changes: 36 additions & 25 deletions src/storage/compression/dict_fsst/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ namespace duckdb {
namespace dict_fsst {

DictFSSTCompressionState::DictFSSTCompressionState(ColumnDataCheckpointData &checkpoint_data_p,
unique_ptr<DictFSSTAnalyzeState> &&analyze)
: CompressionState(analyze->info), checkpoint_data(checkpoint_data_p),
function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICT_FSST)) {
unique_ptr<DictFSSTAnalyzeState> &&analyze_p)
: CompressionState(analyze_p->info), checkpoint_data(checkpoint_data_p),
function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICT_FSST)),
analyze(std::move(analyze_p)) {
CreateEmptySegment(checkpoint_data.GetRowGroup().start);
}

Expand All @@ -21,7 +22,6 @@ DictFSSTCompressionState::~DictFSSTCompressionState() {
}

static constexpr uint16_t DICTIONARY_HEADER_SIZE = sizeof(dict_fsst_compression_header_t);
static constexpr idx_t STRING_SIZE_LIMIT = 16384;
static constexpr uint16_t FSST_SYMBOL_TABLE_SIZE = sizeof(duckdb_fsst_decoder_t);
static constexpr idx_t DICTIONARY_ENCODE_THRESHOLD = 4096;

Expand Down Expand Up @@ -69,6 +69,23 @@ idx_t DictFSSTCompressionState::Finalize() {
auto string_lengths_dest = AlignValue<idx_t>(symbol_table_dest + symbol_table_size);
auto dictionary_indices_dest = AlignValue<idx_t>(string_lengths_dest + string_lengths_space);

#ifdef DEBUG
idx_t required_space = 0;
required_space += sizeof(dict_fsst_compression_header_t);
required_space = AlignValue<idx_t>(required_space);
required_space += dictionary_offset;
required_space = AlignValue<idx_t>(required_space);
if (is_fsst_encoded) {
required_space += symbol_table_size;
}
required_space = AlignValue<idx_t>(required_space);
required_space += string_lengths_space;
required_space = AlignValue<idx_t>(required_space);
required_space += dictionary_indices_space;

D_ASSERT(info.GetBlockSize() >= required_space);
#endif

header_ptr->mode = ConvertToMode(append_state);
header_ptr->symbol_table_size = symbol_table_size;
header_ptr->dict_size = dictionary_offset;
Expand All @@ -94,22 +111,6 @@ idx_t DictFSSTCompressionState::Finalize() {
auto expected_bitwidth = BitpackingPrimitives::MinimumBitWidth(dict_count - 1);
D_ASSERT(dictionary_indices_width == expected_bitwidth);
}

#ifdef DEBUG
idx_t required_space = 0;
required_space += sizeof(dict_fsst_compression_header_t);
required_space = AlignValue<idx_t>(required_space);
required_space += dictionary_offset;
if (is_fsst_encoded) {
required_space += symbol_table_size;
}
required_space = AlignValue<idx_t>(required_space);
required_space += dictionary_indices_space;
required_space = AlignValue<idx_t>(required_space);
required_space += string_lengths_space;

D_ASSERT(info.GetBlockSize() >= required_space);
#endif
D_ASSERT((uint64_t)*max_element(std::begin(dictionary_indices), std::end(dictionary_indices)) == dict_count - 1);
return total_size;
}
Expand Down Expand Up @@ -163,7 +164,8 @@ void DictFSSTCompressionState::FlushEncodingBuffer() {
string_lengths_width = BitpackingPrimitives::MinimumBitWidth(biggest_strlen);
}
real_string_lengths_width = string_lengths_width;
string_lengths_space = BitpackingPrimitives::GetRequiredSize(dict_count, string_lengths_space);
string_lengths_space = BitpackingPrimitives::GetRequiredSize(dict_count, string_lengths_width);
D_ASSERT(string_lengths_space != 0);
to_encode_string_sum = 0;
dictionary_encoding_buffer.clear();
}
Expand Down Expand Up @@ -214,10 +216,6 @@ void DictFSSTCompressionState::Flush(bool final) {
auto &state = checkpoint_data.GetCheckpointState();
state.FlushSegment(std::move(current_segment), std::move(current_handle), segment_size);

if (!final) {
CreateEmptySegment(next_start);
}

// Reset the state
uncompressed_dictionary_copy.Destroy();
//! This should already be empty at this point, otherwise that means that strings are not encoded / not added to the
Expand All @@ -234,6 +232,10 @@ void DictFSSTCompressionState::Flush(bool final) {
encoder = nullptr;
symbol_table_size = DConstants::INVALID_INDEX;
}

if (!final) {
CreateEmptySegment(next_start);
}
}

static inline bool RequiresHigherBitWidth(bitpacking_width_t bitwidth, uint32_t other) {
Expand Down Expand Up @@ -261,6 +263,10 @@ static inline bool AddLookup(DictFSSTCompressionState &state, idx_t lookup, cons
required_space += state.dictionary_offset;
}
required_space = AlignValue<idx_t>(required_space);
if (IsEncoded(APPEND_STATE)) {
required_space += state.symbol_table_size;
required_space = AlignValue<idx_t>(required_space);
}
required_space += new_dictionary_indices_space;
required_space = AlignValue<idx_t>(required_space);
required_space += state.string_lengths_space;
Expand Down Expand Up @@ -332,6 +338,10 @@ static inline bool AddToDictionary(DictFSSTCompressionState &state, const string
required_space += state.dictionary_offset + str_len;
}
required_space = AlignValue<idx_t>(required_space);
if (IsEncoded(APPEND_STATE)) {
required_space += state.symbol_table_size;
required_space = AlignValue<idx_t>(required_space);
}
required_space += new_dictionary_indices_space;
required_space = AlignValue<idx_t>(required_space);
required_space += new_string_lengths_space;
Expand Down Expand Up @@ -374,6 +384,7 @@ static inline bool AddToDictionary(DictFSSTCompressionState &state, const string
}
if (requires_higher_strlen_bitwidth || recalculate_strlen_space) {
state.string_lengths_space = new_string_lengths_space;
D_ASSERT(state.string_lengths_space != 0);
}

if (requires_higher_indices_bitwidth) {
Expand Down
10 changes: 7 additions & 3 deletions src/storage/compression/dict_fsst/decompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ void CompressedStringScanState::Initialize(bool initialize_dictionary) {
(void)(ret);
D_ASSERT(ret != 0); // FIXME: the old code set the decoder to nullptr instead, why???

auto string_block_limit = StringUncompressed::GetStringBlockLimit(segment.GetBlockManager().GetBlockSize());
decompress_buffer.resize(string_block_limit + 1);
//! The biggest string_length covered by the 'string_lengths_width'
uint32_t max_string_length = (1 << string_lengths_width) - 1;
auto buffer_size = (max_string_length * 8) + 1;
decompress_buffer.resize(buffer_size);
break;
}
default:
Expand All @@ -102,8 +104,10 @@ void CompressedStringScanState::Initialize(bool initialize_dictionary) {

int32_t offset = 0;
for (uint32_t i = 0; i < dict_count; i++) {
//! We can uncompress during fetching, we need the length of the string inside the dictionary
auto string_len = string_lengths[i];
dict_child_data[i] = FetchStringFromDict(*dictionary, offset, i);
offset += dict_child_data[i].GetSize();
offset += string_len;
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/sql/storage/compression/dict_fsst/dict_fsst_test.test
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ select
(i % 200)::INTEGER::VARCHAR,
2047 // len((i % 200)::INTEGER::VARCHAR)
) a
from range(20000) t(i);
from range(400) t(i);

statement ok
checkpoint;
Expand Down

0 comments on commit 70d8115

Please sign in to comment.