From 96297d7a4443d447c5a78f13217dcc6352c17689 Mon Sep 17 00:00:00 2001 From: Tishj Date: Wed, 8 Jan 2025 14:06:17 +0100 Subject: [PATCH] added the fsst encoding step, code is compiling successfully --- .../storage/compression/dict_fsst/analyze.hpp | 5 +- .../storage/compression/dict_fsst/common.hpp | 20 ++- .../compression/dict_fsst/compression.hpp | 27 +++- src/storage/compression/dict_fsst/analyze.cpp | 20 ++- src/storage/compression/dict_fsst/common.cpp | 54 ++++++- .../compression/dict_fsst/compression.cpp | 138 +++++++++++++++--- test/issues/general/test_3611.test | 2 +- test/issues/general/test_5488.test | 2 +- .../dictionary_covers_validity.test | 2 +- .../dictionary/dictionary_storage_info.test | 2 +- .../nested/list/list_aggregate_dict.test | 2 +- 11 files changed, 226 insertions(+), 48 deletions(-) diff --git a/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp b/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp index 05ab08e85706..62c2ec5fd086 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp @@ -20,9 +20,10 @@ struct DictFSSTAnalyzeState : public DictFSSTCompressionState { void AddNewString(string_t str) override; void AddLookup(uint32_t lookup_result) override; void AddNull() override; - bool HasRoomForString(bool new_string, idx_t string_size) override; + idx_t RequiredSpace(bool new_string, idx_t string_size) override; void Flush(bool final = false) override; - void ProcessStrings(UnifiedVectorFormat &input, idx_t count) override; + void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override; + bool EncodeDictionary() override; const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override; void Verify() override; diff --git a/src/include/duckdb/storage/compression/dict_fsst/common.hpp b/src/include/duckdb/storage/compression/dict_fsst/common.hpp index 8b0d8f1c8a59..56d2ad22e8d1 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/common.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/common.hpp @@ -18,6 +18,12 @@ typedef struct { bool fsst_encoded; } dict_fsst_compression_header_t; +enum class DictionaryAppendState : uint8_t { + REGULAR, //! Symbol table threshold not reached yet + ENCODED, //! Reached the threshold, decided to encode the dictionary + NOT_ENCODED //! Reached the threshold, decided not to encode the dictionary +}; + struct DictFSSTCompression { public: static constexpr float MINIMUM_COMPRESSION_RATIO = 1.2F; @@ -54,18 +60,22 @@ class DictFSSTCompressionState : public CompressionState { virtual void AddNewString(string_t str) = 0; // Add a null value to the compression state virtual void AddNull() = 0; - // Needs to be called before adding a value. Will return false if a flush is required first. - virtual bool HasRoomForString(bool new_string, idx_t string_size) = 0; + virtual idx_t RequiredSpace(bool new_string, idx_t string_size) = 0; // Flush the segment to disk if compressing or reset the counters if analyzing virtual void Flush(bool final = false) = 0; // Process the strings of the vector if necessary - virtual void ProcessStrings(UnifiedVectorFormat &input, idx_t count) = 0; + virtual void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) = 0; + // Encode the dictionary with FSST, return false if we decided not to encode + virtual bool EncodeDictionary() = 0; // Retrieve the string given the indices virtual const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) = 0; +private: + bool DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, idx_t index, idx_t raw_index); + protected: - //! Whether the dictionary has been encoded with FSST - bool fsst_encoded = false; + //! Keep track of the append state for the current segment + DictionaryAppendState append_state = DictionaryAppendState::REGULAR; }; } // namespace dict_fsst diff --git a/src/include/duckdb/storage/compression/dict_fsst/compression.hpp b/src/include/duckdb/storage/compression/dict_fsst/compression.hpp index f8cc37ac1abc..4447c4dda925 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/compression.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/compression.hpp @@ -19,10 +19,29 @@ namespace dict_fsst { // scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it // allows for efficient bitpacking compression as the selection values should remain relatively small. +struct EncodedInputData { +public: + explicit EncodedInputData(Allocator &allocator) : heap(allocator) { + } + +public: + void Reset() { + input_data.clear(); + heap.Destroy(); + } + +public: + StringHeap heap; + vector input_data; +}; + //===--------------------------------------------------------------------===// // Compress //===--------------------------------------------------------------------===// struct DictFSSTCompressionCompressState : public DictFSSTCompressionState { +public: + static constexpr idx_t DICTIONARY_ENCODE_THRESHOLD = 4096; + public: DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info); @@ -33,9 +52,10 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState { void AddNewString(string_t str) override; void AddNull() override; void AddLookup(uint32_t lookup_result) override; - bool HasRoomForString(bool new_string, idx_t string_size) override; + idx_t RequiredSpace(bool new_string, idx_t string_size) override; void Flush(bool final = false) override; - void ProcessStrings(UnifiedVectorFormat &input, idx_t count) override; + void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override; + bool EncodeDictionary() override; const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override; idx_t Finalize(); @@ -56,6 +76,9 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState { bitpacking_width_t current_width = 0; bitpacking_width_t next_width = 0; + + EncodedInputData encoded_input; + void *encoder; }; } // namespace dict_fsst diff --git a/src/storage/compression/dict_fsst/analyze.cpp b/src/storage/compression/dict_fsst/analyze.cpp index 2b102052fe0f..21b62ee962ea 100644 --- a/src/storage/compression/dict_fsst/analyze.cpp +++ b/src/storage/compression/dict_fsst/analyze.cpp @@ -32,22 +32,26 @@ void DictFSSTAnalyzeState::AddNull() { current_tuple_count++; } -void DictFSSTAnalyzeState::ProcessStrings(UnifiedVectorFormat &input, idx_t count) { - return; +void DictFSSTAnalyzeState::EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) { + throw InternalException("We should never encode during analyze step"); +} + +bool DictFSSTAnalyzeState::EncodeDictionary() { + return false; } const string_t &DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { return strings[index]; } -bool DictFSSTAnalyzeState::HasRoomForString(bool new_string, idx_t string_size) { +idx_t DictFSSTAnalyzeState::RequiredSpace(bool new_string, idx_t string_size) { if (!new_string) { - return DictFSSTCompression::HasEnoughSpace(current_tuple_count + 1, current_unique_count, current_dict_size, - current_width, info.GetBlockSize()); + return DictFSSTCompression::RequiredSpace(current_tuple_count + 1, current_unique_count, current_dict_size, + current_width); } - next_width = BitpackingPrimitives::MinimumBitWidth(current_unique_count + 2); // 1 for null, one for new string - return DictFSSTCompression::HasEnoughSpace(current_tuple_count + 1, current_unique_count + 1, - current_dict_size + string_size, next_width, info.GetBlockSize()); + auto next_width = BitpackingPrimitives::MinimumBitWidth(current_unique_count + 2); // 1 for null, one for new string + return DictFSSTCompression::RequiredSpace(current_tuple_count + 1, current_unique_count + 1, + current_dict_size + string_size, next_width); } void DictFSSTAnalyzeState::Flush(bool final) { diff --git a/src/storage/compression/dict_fsst/common.cpp b/src/storage/compression/dict_fsst/common.cpp index cf2853eea49f..4ea16dd1eabe 100644 --- a/src/storage/compression/dict_fsst/common.cpp +++ b/src/storage/compression/dict_fsst/common.cpp @@ -1,4 +1,7 @@ #include "duckdb/storage/compression/dict_fsst/common.hpp" +#include "fsst.h" + +static constexpr uint16_t FSST_SYMBOL_TABLE_SIZE = sizeof(duckdb_fsst_decoder_t); namespace duckdb { namespace dict_fsst { @@ -42,13 +45,51 @@ DictFSSTCompressionState::DictFSSTCompressionState(const CompressionInfo &info) DictFSSTCompressionState::~DictFSSTCompressionState() { } +bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, + idx_t index, idx_t raw_index) { + auto strings = vdata.GetData(vdata); + reference str = GetString(strings, index, raw_index); + auto required_space = RequiredSpace(is_new, str.get().GetSize()); + + auto block_size = info.GetBlockSize(); + + switch (append_state) { + case DictionaryAppendState::REGULAR: { + auto symbol_table_threshold = block_size - FSST_SYMBOL_TABLE_SIZE; + if (required_space > symbol_table_threshold) { + //! Decide whether or not to encode the dictionary + if (EncodeDictionary()) { + EncodeInputStrings(vdata, count); + str = GetString(strings, index, raw_index); + required_space = RequiredSpace(is_new, str.get().GetSize()); + if (required_space > block_size) { + //! Even after encoding the dictionary, we can't add this string + return false; + } + } + } + return true; + } + case DictionaryAppendState::ENCODED: { + return required_space <= (block_size - FSST_SYMBOL_TABLE_SIZE); + } + case DictionaryAppendState::NOT_ENCODED: { + return required_space <= block_size; + } + }; +} + bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { UnifiedVectorFormat vdata; scan_vector.ToUnifiedFormat(count, vdata); auto data = UnifiedVectorFormat::GetData(vdata); Verify(); - ProcessStrings(vdata, count); + if (append_state == DictionaryAppendState::ENCODED) { + // The dictionary has been encoded + // to look up a string in the dictionary, the input needs to be encoded as well + EncodeInputStrings(vdata, count); + } for (idx_t i = 0; i < count; i++) { auto idx = vdata.sel->get_index(i); @@ -56,9 +97,9 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { optional_idx lookup_result; auto row_is_valid = vdata.validity.RowIsValid(idx); - auto &str = GetString(data, idx, i); + reference str = GetString(data, idx, i); if (row_is_valid) { - string_size = str.GetSize(); + string_size = str.get().GetSize(); if (string_size >= StringUncompressed::GetStringBlockLimit(info.GetBlockSize())) { // Big strings not implemented for dictionary compression return false; @@ -67,13 +108,12 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { } bool new_string = !lookup_result.IsValid(); - bool fits = HasRoomForString(new_string, string_size); + bool fits = DryAppendToCurrentSegment(new_string, vdata, count, idx, i); + if (!fits) { - // TODO: Check if the dictionary requires FSST encoding Flush(); lookup_result = optional_idx(); - - fits = HasRoomForString(true, string_size); + fits = DryAppendToCurrentSegment(true, vdata, count, idx, i); if (!fits) { throw InternalException("Dictionary compression could not write to new segment"); } diff --git a/src/storage/compression/dict_fsst/compression.cpp b/src/storage/compression/dict_fsst/compression.cpp index 3e228f4eba78..a8282d887b5a 100644 --- a/src/storage/compression/dict_fsst/compression.cpp +++ b/src/storage/compression/dict_fsst/compression.cpp @@ -8,7 +8,8 @@ namespace dict_fsst { DictFSSTCompressionCompressState::DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info) : DictFSSTCompressionState(info), checkpoint_data(checkpoint_data_p), - function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICT_FSST)) { + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICT_FSST)), + encoded_input(BufferAllocator::Get(checkpoint_data.GetDatabase())) { CreateEmptySegment(checkpoint_data.GetRowGroup().start); } @@ -98,22 +99,20 @@ void DictFSSTCompressionCompressState::AddLookup(uint32_t lookup_result) { current_segment->count++; } -bool DictFSSTCompressionCompressState::HasRoomForString(bool new_string, idx_t string_size) { - static constexpr uint16_t FSST_SYMBOL_TABLE_SIZE = sizeof(duckdb_fsst_decoder_t); - - auto block_size = info.GetBlockSize(); +idx_t DictFSSTCompressionCompressState::RequiredSpace(bool new_string, idx_t string_size) { if (!new_string) { - return DictFSSTCompression::HasEnoughSpace(current_segment->count.load() + 1, index_buffer.size(), - current_dictionary.size, current_width, block_size); + return DictFSSTCompression::RequiredSpace(current_segment->count.load() + 1, index_buffer.size(), + current_dictionary.size, current_width); } - next_width = BitpackingPrimitives::MinimumBitWidth(index_buffer.size() - 1 + new_string); - return DictFSSTCompression::HasEnoughSpace(current_segment->count.load() + 1, index_buffer.size() + 1, - current_dictionary.size + string_size, next_width, block_size); + auto next_width = BitpackingPrimitives::MinimumBitWidth(index_buffer.size() - 1 + new_string); + return DictFSSTCompression::RequiredSpace(current_segment->count.load() + 1, index_buffer.size() + 1, + current_dictionary.size + string_size, next_width); } void DictFSSTCompressionCompressState::Flush(bool final) { auto next_start = current_segment->start + current_segment->count; + append_state = DictionaryAppendState::REGULAR; auto segment_size = Finalize(); auto &state = checkpoint_data.GetCheckpointState(); @@ -124,21 +123,122 @@ void DictFSSTCompressionCompressState::Flush(bool final) { } } -void DictFSSTCompressionCompressState::ProcessStrings(UnifiedVectorFormat &input, idx_t count) { - if (!fsst_encoded) { - // No need to process anything - return; +void DictFSSTCompressionCompressState::EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) { + D_ASSERT(append_state == DictionaryAppendState::ENCODED); + + encoded_input.Reset(); + D_ASSERT(encoder); + auto fsst_encoder = (duckdb_fsst_encoder_t *)(encoder); + + vector fsst_string_sizes; + vector fsst_string_ptrs; + //! We could use the 'current_string_map' but this won't be in-order + // and we want to preserve the order of the dictionary after rewriting + auto data = input.GetData(input); + idx_t total_size = 0; + for (idx_t i = 0; i < count; i++) { + auto idx = input.sel->get_index(i); + auto &str = data[idx]; + fsst_string_sizes.push_back(str.GetSize()); + fsst_string_ptrs.push_back((unsigned char *)str.GetData()); // NOLINT + total_size += str.GetSize(); + } + + size_t output_buffer_size = 7 + 2 * total_size; // size as specified in fsst.h + auto compressed_ptrs = vector(count, nullptr); + auto compressed_sizes = vector(count, 0); + auto compressed_buffer = make_unsafe_uniq_array_uninitialized(output_buffer_size); + + auto res = + duckdb_fsst_compress(fsst_encoder, count, &fsst_string_sizes[0], &fsst_string_ptrs[0], output_buffer_size, + compressed_buffer.get(), &compressed_sizes[0], &compressed_ptrs[0]); + if (res != count) { + throw FatalException("FSST compression failed to compress all input strings"); + } + + auto &strings = encoded_input.input_data; + auto &heap = encoded_input.heap; + for (idx_t i = 0; i < count; i++) { + uint32_t size = UnsafeNumericCast(compressed_sizes[i]); + string_t encoded_string((const char *)compressed_ptrs[i], size); // NOLINT; + strings.push_back(heap.AddString(std::move(encoded_string))); + } +} + +bool DictFSSTCompressionCompressState::EncodeDictionary() { + if (current_dictionary.size < DICTIONARY_ENCODE_THRESHOLD) { + append_state = DictionaryAppendState::NOT_ENCODED; + return false; + } + append_state = DictionaryAppendState::ENCODED; + + // Encode the dictionary: + // first prepare the input to create the fsst_encoder + // create the encoder + // allocate for enough space to encode the dictionary + // encode the dictionary + // write the (exported) symbol table to the end of the segment + // then rewrite the dictionary + index_buffer + current_string_map + + vector fsst_string_sizes; + vector fsst_string_ptrs; + data_ptr_t last_start = current_end_ptr; + //! We could use the 'current_string_map' but this won't be in-order + // and we want to preserve the order of the dictionary after rewriting + for (auto offset : index_buffer) { + auto start = current_end_ptr - offset; + size_t size = UnsafeNumericCast(last_start - start); + fsst_string_sizes.push_back(size); + fsst_string_ptrs.push_back((unsigned char *)start); // NOLINT + last_start = start; } - throw NotImplementedException("FSST ENCODED PROCESS STRINGS"); - // TODO: perform fsst encoding on the provided strings + + auto string_count = index_buffer.size(); + encoder = duckdb_fsst_create(string_count, &fsst_string_sizes[0], &fsst_string_ptrs[0], 0); + auto fsst_encoder = (duckdb_fsst_encoder_t *)(encoder); + + size_t output_buffer_size = 7 + 2 * current_dictionary.size; // size as specified in fsst.h + auto compressed_ptrs = vector(string_count, nullptr); + auto compressed_sizes = vector(string_count, 0); + auto compressed_buffer = make_unsafe_uniq_array_uninitialized(output_buffer_size); + + auto res = + duckdb_fsst_compress(fsst_encoder, string_count, &fsst_string_sizes[0], &fsst_string_ptrs[0], + output_buffer_size, compressed_buffer.get(), &compressed_sizes[0], &compressed_ptrs[0]); + if (res != string_count) { + throw FatalException("FSST compression failed to compress all dictionary strings"); + } + + // Write the exported symbol table to the end of the segment + unsigned char fsst_serialized_symbol_table[sizeof(duckdb_fsst_decoder_t)]; + auto symbol_table_size = duckdb_fsst_export(fsst_encoder, fsst_serialized_symbol_table); + current_end_ptr -= symbol_table_size; + memcpy(current_end_ptr, (void *)fsst_serialized_symbol_table, symbol_table_size); + + // Rewrite the dictionary + current_string_map.clear(); + uint32_t offset = 0; + for (idx_t i = 0; i < string_count; i++) { + auto &start = compressed_ptrs[i]; + auto &size = compressed_sizes[i]; + offset += size; + index_buffer[i] = offset; + auto dest = current_end_ptr - offset; + memcpy(dest, start, size); + string_t dictionary_string((const char *)start, UnsafeNumericCast(size)); // NOLINT + current_string_map.insert({dictionary_string, i}); + } + current_dictionary.size = offset; + current_dictionary.end -= symbol_table_size; + DictFSSTCompression::SetDictionary(*current_segment, current_handle, current_dictionary); + return true; } const string_t &DictFSSTCompressionCompressState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { - if (!fsst_encoded) { + if (append_state != DictionaryAppendState::ENCODED) { return strings[index]; } - throw NotImplementedException("FSST ENCODED GET STRING"); - // TODO: look up the encoded string given the 'raw_index' + return encoded_input.input_data[raw_index]; } idx_t DictFSSTCompressionCompressState::Finalize() { diff --git a/test/issues/general/test_3611.test b/test/issues/general/test_3611.test index c837c191c180..53985d38e848 100644 --- a/test/issues/general/test_3611.test +++ b/test/issues/general/test_3611.test @@ -9,7 +9,7 @@ statement ok PRAGMA enable_verification statement ok -PRAGMA force_compression='dictionary' +PRAGMA force_compression='dict_fsst' statement ok CREATE TABLE all_types AS SELECT varchar FROM test_all_types(); diff --git a/test/issues/general/test_5488.test b/test/issues/general/test_5488.test index 999e5028dc3e..91b6e5a24ed6 100644 --- a/test/issues/general/test_5488.test +++ b/test/issues/general/test_5488.test @@ -6,7 +6,7 @@ load __TEST_DIR__/issue_5488.db statement ok -pragma force_compression='dictionary' +pragma force_compression='dict_fsst' statement ok CREATE TABLE test ( col_a TEXT); diff --git a/test/sql/storage/compression/dictionary/dictionary_covers_validity.test b/test/sql/storage/compression/dictionary/dictionary_covers_validity.test index 5b707d8c4af2..3c16f724dab3 100644 --- a/test/sql/storage/compression/dictionary/dictionary_covers_validity.test +++ b/test/sql/storage/compression/dictionary/dictionary_covers_validity.test @@ -23,7 +23,7 @@ INSERT INTO tbl VALUES ( ); statement ok -set force_compression='dictionary'; +set force_compression='dict_fsst'; statement ok force checkpoint; diff --git a/test/sql/storage/compression/dictionary/dictionary_storage_info.test b/test/sql/storage/compression/dictionary/dictionary_storage_info.test index eb6c61a86d84..b2e412e6e7ae 100644 --- a/test/sql/storage/compression/dictionary/dictionary_storage_info.test +++ b/test/sql/storage/compression/dictionary/dictionary_storage_info.test @@ -6,7 +6,7 @@ load __TEST_DIR__/test_dictionary.db statement ok -PRAGMA force_compression = 'dictionary' +PRAGMA force_compression = 'dict_fsst' statement ok CREATE TABLE test (a VARCHAR, b VARCHAR); diff --git a/test/sql/types/nested/list/list_aggregate_dict.test b/test/sql/types/nested/list/list_aggregate_dict.test index 68012ec82da3..47b13cb6d941 100644 --- a/test/sql/types/nested/list/list_aggregate_dict.test +++ b/test/sql/types/nested/list/list_aggregate_dict.test @@ -5,7 +5,7 @@ load __TEST_DIR__/store_dict.db statement ok -pragma force_compression='dictionary'; +pragma force_compression='dict_fsst'; statement ok CREATE TABLE Hosts (ips varchar[]);