From f7344ebf57023a06aec0c7f40c04d3cb65970752 Mon Sep 17 00:00:00 2001 From: Tishj Date: Thu, 16 Jan 2025 12:57:55 +0100 Subject: [PATCH] WIP --- .../storage/compression/dict_fsst/analyze.hpp | 45 +-- .../storage/compression/dict_fsst/common.hpp | 1 + .../compression/dict_fsst/compression.hpp | 80 ++--- src/storage/compression/dict_fsst.cpp | 16 +- src/storage/compression/dict_fsst/analyze.cpp | 225 ++----------- src/storage/compression/dict_fsst/common.cpp | 5 +- .../compression/dict_fsst/compression.cpp | 311 +++++++++++++++++- 7 files changed, 384 insertions(+), 299 deletions(-) diff --git a/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp b/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp index 4e3576ab046..ba2ce209776 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp @@ -11,50 +11,19 @@ namespace dict_fsst { //===--------------------------------------------------------------------===// // Analyze //===--------------------------------------------------------------------===// -struct DictFSSTAnalyzeState : public DictFSSTCompressionState { +struct DictFSSTAnalyzeState : public AnalyzeState { public: explicit DictFSSTAnalyzeState(const CompressionInfo &info); - ~DictFSSTAnalyzeState() override; public: - optional_idx LookupString(const string_t &str) override; - void AddNewString(const StringData &str) override; - void AddLookup(uint32_t lookup_result) override; - void AddNull() override; - idx_t RequiredSpace(bool new_string, idx_t string_size) override; - void Flush(bool final = false) override; - void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override; - bool EncodeDictionary() override; - StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override; - void Verify() override; + bool Analyze(Vector &input, idx_t count); + idx_t FinalAnalyze(); public: - idx_t segment_count; - idx_t current_tuple_count; - idx_t current_unique_count; - idx_t current_dict_size; - StringHeap heap; - //! string -> string_length - string_map_t current_string_map; - bitpacking_width_t current_width; - bitpacking_width_t next_width; - - bitpacking_width_t string_length_bitwidth = NumericLimits::Maximum(); - uint32_t max_length = 0; - - idx_t total_space = 0; - unsafe_unique_array compression_buffer; - idx_t compression_buffer_size = 0; -}; - -struct DictFSSTCompressionAnalyzeState : public AnalyzeState { -public: - explicit DictFSSTCompressionAnalyzeState(const CompressionInfo &info) - : AnalyzeState(info), analyze_state(make_uniq(info)) { - } - -public: - unique_ptr analyze_state; + idx_t max_string_length = 0; + bool contains_nulls = false; + idx_t total_string_length = 0; + idx_t total_count = 0; }; } // namespace dict_fsst diff --git a/src/include/duckdb/storage/compression/dict_fsst/common.hpp b/src/include/duckdb/storage/compression/dict_fsst/common.hpp index 7aadaa07b1e..8b6dd129d6b 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/common.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/common.hpp @@ -38,6 +38,7 @@ struct DictFSSTCompression { public: //! Dictionary header size at the beginning of the string segment (offset + length) static constexpr uint16_t DICTIONARY_HEADER_SIZE = sizeof(dict_fsst_compression_header_t); + static constexpr idx_t STRING_SIZE_LIMIT = 16384; public: static bool HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size, diff --git a/src/include/duckdb/storage/compression/dict_fsst/compression.hpp b/src/include/duckdb/storage/compression/dict_fsst/compression.hpp index 8b922925bb2..2d5df59b0a2 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/compression.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/compression.hpp @@ -2,6 +2,7 @@ #include "duckdb/common/typedefs.hpp" #include "duckdb/storage/compression/dict_fsst/common.hpp" +#include "duckdb/storage/compression/dict_fsst/analyze.hpp" #include "duckdb/function/compression_function.hpp" #include "duckdb/common/string_map_set.hpp" #include "duckdb/storage/table/column_data_checkpointer.hpp" @@ -19,66 +20,69 @@ namespace dict_fsst { // scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it // allows for efficient bitpacking compression as the selection values should remain relatively small. -struct EncodedInputData { -public: - explicit EncodedInputData(Allocator &allocator) : heap(allocator) { - } - -public: - void Reset() { - input_data.clear(); - heap.Destroy(); - } - -public: - StringHeap heap; - vector input_data; -}; - //===--------------------------------------------------------------------===// // Compress //===--------------------------------------------------------------------===// -struct DictFSSTCompressionCompressState : public DictFSSTCompressionState { +struct DictFSSTCompressionCompressState : public CompressionState { public: - DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info); + DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, + unique_ptr &&state); ~DictFSSTCompressionCompressState() override; public: void CreateEmptySegment(idx_t row_start); - void Verify() override; - optional_idx LookupString(const string_t &str) override; - void AddNewString(const StringData &str) override; - void AddNull() override; - void AddLookup(uint32_t lookup_result) override; - idx_t RequiredSpace(bool new_string, idx_t string_size) override; - void Flush(bool final = false) override; - void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override; - bool EncodeDictionary() override; - StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override; idx_t Finalize(); + void Compress(Vector &scan_vector, idx_t count); + void FinalizeCompress(); + void Flush(); + public: ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; - // State regarding current segment unique_ptr current_segment; BufferHandle current_handle; StringDictionaryContainer current_dictionary; - data_ptr_t current_end_ptr; + //! Offset at which to write the next dictionary string + idx_t dictionary_offset = 0; + +public: + idx_t string_lengths_space; + vector string_lengths; + idx_t dict_count = 0; + bitpacking_width_t string_lengths_width = 0; + //! For DICT_FSST we delay encoding of new entries, which means + // we have to prepare for strings being exploded in size by max 2x + // so we have to muddy out 'string_lengths_width' + bitpacking_width_t real_string_lengths_width = 0; + uint32_t max_string_length = 0; - // Buffers and map for current segment + idx_t dictionary_indices_space; + vector dictionary_indices; + bitpacking_width_t dictionary_indices_width = 0; + //! uint32_t max_dictionary_index; (this is 'dict_count') + + //! string -> dictionary_index (for lookups) string_map_t current_string_map; - vector dictionary_string_lengths; - uint32_t max_length = 0; - bitpacking_width_t string_length_bitwidth = 0; + //! strings added to the dictionary waiting to be encoded + vector dictionary_encoding_buffer; + //! for DICT_FSST we store uncompressed strings in the 'current_string_map', this owns that memory + StringHeap uncompressed_dictionary_copy; - vector selection_buffer; + //! This is used for FSST_ONLY, to store the memory of the encoded input + unsafe_unique_array encoding_buffer; + idx_t encoding_buffer_size = 0; - bitpacking_width_t current_width = 0; - bitpacking_width_t next_width = 0; +public: + void *encoder = nullptr; + idx_t symbol_table_size = DConstants::INVALID_INDEX; + DictionaryAppendState append_state = DictionaryAppendState::REGULAR; + bool all_unique = true; - EncodedInputData encoded_input; +public: + idx_t tuple_count = 0; + unique_ptr analyze; }; } // namespace dict_fsst diff --git a/src/storage/compression/dict_fsst.cpp b/src/storage/compression/dict_fsst.cpp index b2b69eb83d3..7957422ab21 100644 --- a/src/storage/compression/dict_fsst.cpp +++ b/src/storage/compression/dict_fsst.cpp @@ -70,20 +70,17 @@ struct DictFSSTCompressionStorage { //===--------------------------------------------------------------------===// unique_ptr DictFSSTCompressionStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) { CompressionInfo info(col_data.GetBlockManager().GetBlockSize()); - return make_uniq(info); + return make_uniq(info); } bool DictFSSTCompressionStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) { - auto &state = state_p.Cast(); - return state.analyze_state->UpdateState(input, count); + auto &analyze_state = state_p.Cast(); + return analyze_state.Analyze(input, count); } idx_t DictFSSTCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) { - auto &analyze_state = state_p.Cast(); - auto &state = *analyze_state.analyze_state; - state.Flush(); - - return state.total_space; + auto &analyze_state = state_p.Cast(); + return analyze_state.FinalAnalyze(); } //===--------------------------------------------------------------------===// @@ -91,7 +88,8 @@ idx_t DictFSSTCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) { //===--------------------------------------------------------------------===// unique_ptr DictFSSTCompressionStorage::InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { - return make_uniq(checkpoint_data, state->info); + return make_uniq( + checkpoint_data, unique_ptr_cast(std::move(state))); } void DictFSSTCompressionStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { diff --git a/src/storage/compression/dict_fsst/analyze.cpp b/src/storage/compression/dict_fsst/analyze.cpp index 598867ad43e..40bcf8348ad 100644 --- a/src/storage/compression/dict_fsst/analyze.cpp +++ b/src/storage/compression/dict_fsst/analyze.cpp @@ -4,210 +4,37 @@ namespace duckdb { namespace dict_fsst { -DictFSSTAnalyzeState::DictFSSTAnalyzeState(const CompressionInfo &info) - : DictFSSTCompressionState(info), segment_count(0), current_tuple_count(0), current_unique_count(0), - current_dict_size(0), current_width(0), next_width(0) { -} - -DictFSSTAnalyzeState::~DictFSSTAnalyzeState() { - if (encoder) { - auto fsst_encoder = reinterpret_cast(encoder); - duckdb_fsst_destroy(fsst_encoder); - } -} - -optional_idx DictFSSTAnalyzeState::LookupString(const string_t &str) { - if (append_state == DictionaryAppendState::ENCODED_ALL_UNIQUE) { - //! In this mode we omit the selection buffer, storing (possible) duplicates in the dictionary - return optional_idx(); - } - return current_string_map.count(str) ? 1 : optional_idx(); -} - -void DictFSSTAnalyzeState::AddNewString(const StringData &string_data) { - D_ASSERT(!string_data.encoded_string); - auto &str = string_data.string; - - current_tuple_count++; - current_unique_count++; - current_dict_size += str.GetSize(); - - uint32_t string_length = UnsafeNumericCast(string_data.string.GetSize()); - if (IsEncoded()) { - //! Optimistic assumption about the compressed length; - string_length /= 2; - } - - if (string_length > max_length) { - max_length = string_length; - string_length_bitwidth = BitpackingPrimitives::MinimumBitWidth(string_length); - } - - if (str.IsInlined()) { - current_string_map.emplace(str, string_length); - } else { - current_string_map.emplace(heap.AddBlob(str), string_length); - } - if (append_state != DictionaryAppendState::ENCODED_ALL_UNIQUE) { - current_width = next_width; - } -} - -void DictFSSTAnalyzeState::AddLookup(uint32_t) { - current_tuple_count++; -} - -void DictFSSTAnalyzeState::AddNull() { - //! With FSST_ONLY we can't store validity, so we can only use this mode when no validity is required (all are - //! non-null). - all_unique = false; - current_tuple_count++; -} - -void DictFSSTAnalyzeState::EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) { - return; -} - -bool DictFSSTAnalyzeState::EncodeDictionary() { - if (current_dict_size < DICTIONARY_ENCODE_THRESHOLD) { - append_state = DictionaryAppendState::NOT_ENCODED; - return false; - } - - vector fsst_string_sizes; - vector fsst_string_ptrs; - - // Skip index 0, that's reserved for NULL - for (auto &pair : current_string_map) { - auto &str = pair.first; - fsst_string_sizes.push_back(str.GetSize()); - fsst_string_ptrs.push_back((unsigned char *)str.GetData()); // NOLINT - } - - // Create the encoder - auto string_count = current_string_map.size(); - encoder = - reinterpret_cast(duckdb_fsst_create(string_count, &fsst_string_sizes[0], &fsst_string_ptrs[0], 0)); - auto fsst_encoder = reinterpret_cast(encoder); - - size_t output_buffer_size = 7 + 2 * current_dict_size; // size as specified in fsst.h - idx_t required_size = output_buffer_size; - required_size = AlignValue(required_size); - required_size += sizeof(unsigned char *) * string_count; - D_ASSERT(ValueIsAligned(required_size)); - required_size += sizeof(size_t) * string_count; - - if (!compression_buffer || required_size > compression_buffer_size) { - compression_buffer = make_unsafe_uniq_array_uninitialized(required_size); - compression_buffer_size = required_size; - } - auto compressed_ptrs = AlignPointer(compression_buffer.get() + output_buffer_size); - auto compressed_sizes = compressed_ptrs + (sizeof(unsigned char *) * string_count); - - // Compress the dictionary - auto res = - duckdb_fsst_compress(fsst_encoder, string_count, &fsst_string_sizes[0], &fsst_string_ptrs[0], - output_buffer_size, compression_buffer.get(), reinterpret_cast(compressed_sizes), - reinterpret_cast(compressed_ptrs)); - if (res != string_count) { - throw FatalException("FSST compression failed to compress all dictionary strings"); - } - - idx_t new_size = 0; - for (idx_t i = 0; i < string_count; i++) { - new_size += compressed_sizes[i]; - } - if (new_size > current_dict_size + DICTIONARY_ENCODE_THRESHOLD) { - // The dictionary does not compress well enough to use FSST - // continue filling the remaining bytes without encoding - duckdb_fsst_destroy(fsst_encoder); - encoder = nullptr; - append_state = DictionaryAppendState::NOT_ENCODED; - return false; - } - - max_length = 0; - for (idx_t i = 0; i < string_count; i++) { - uint32_t size = UnsafeNumericCast(compressed_sizes[i]); - if (size > max_length) { - max_length = size; +DictFSSTAnalyzeState::DictFSSTAnalyzeState(const CompressionInfo &info) : DictFSSTCompressionState(info) { +} + +bool DictFSSTAnalyzeState::Analyze(Vector &input, idx_t count) { + UnifiedVectorFormat vector_format; + input.ToUnifiedFormat(count, vector_format); + auto strings = vector_format.GetData(vector_format); + + for (idx_t i = 0; i < count; i++) { + auto idx = vector_format.sel->get_index(i); + if (!vector_format.validity.RowIsValid(i)) { + contains_nulls = true; + } else { + auto &str = strings[idx]; + auto str_len = str.GetSize(); + total_string_length += str_len; + if (str_len > max_string_length) { + max_string_length = str_len; + } + if (str_len >= DictFSSTCompression::STRING_SIZE_LIMIT) { + //! This string is too long, we don't want to use DICT_FSST for this rowgroup + return false; + } } - auto uncompressed_str = string_t((const char *)fsst_string_ptrs[i], (uint32_t)fsst_string_sizes[i]); // NOLINT - current_string_map[uncompressed_str] = size; - } - string_length_bitwidth = BitpackingPrimitives::MinimumBitWidth(max_length); - current_dict_size = new_size; - - if (all_unique) { - append_state = DictionaryAppendState::ENCODED_ALL_UNIQUE; - current_width = 0; - } else { - append_state = DictionaryAppendState::ENCODED; } + total_count += count; return true; } -StringData DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { - return StringData(strings[index]); -} - -idx_t DictFSSTAnalyzeState::RequiredSpace(bool new_string, idx_t string_size) { - idx_t required_space = 0; - if (IsEncoded()) { - required_space += symbol_table_size; - } - - if (!new_string) { - required_space += DictFSSTCompression::RequiredSpace(current_tuple_count + 1, current_unique_count, - current_dict_size, current_width, string_length_bitwidth); - } else { - next_width = BitpackingPrimitives::MinimumBitWidth(current_unique_count + 2); // 1 for null, one for new string - if (append_state == DictionaryAppendState::ENCODED_ALL_UNIQUE) { - next_width = 0; - } - required_space += - DictFSSTCompression::RequiredSpace(current_tuple_count + 1, current_unique_count + 1, - current_dict_size + string_size, next_width, string_length_bitwidth); - } - return required_space; -} - -void DictFSSTAnalyzeState::Flush(bool final) { - if (!current_tuple_count) { - D_ASSERT(!current_unique_count); - D_ASSERT(!current_dict_size); - D_ASSERT(current_string_map.empty()); - return; - } - idx_t required_space = 0; - if (IsEncoded()) { - required_space += symbol_table_size; - } - required_space += DictFSSTCompression::RequiredSpace(current_tuple_count, current_unique_count, current_dict_size, - current_width, string_length_bitwidth); - - total_space += required_space; - - segment_count++; - current_tuple_count = 0; - current_unique_count = 0; - current_dict_size = 0; - current_string_map.clear(); - max_length = 0; - - if (IsEncoded()) { - auto fsst_encoder = reinterpret_cast(encoder); - duckdb_fsst_destroy(fsst_encoder); - encoder = nullptr; - symbol_table_size = DConstants::INVALID_INDEX; - } - append_state = DictionaryAppendState::REGULAR; - all_unique = true; - - heap.Destroy(); -} - -void DictFSSTAnalyzeState::Verify() { +idx_t DictFSSTAnalyzeState::FinalAnalyze() { + return LossyNumericCast((double)total_string_length / 2.0); } } // namespace dict_fsst diff --git a/src/storage/compression/dict_fsst/common.cpp b/src/storage/compression/dict_fsst/common.cpp index df5345721b7..7318781fa52 100644 --- a/src/storage/compression/dict_fsst/common.cpp +++ b/src/storage/compression/dict_fsst/common.cpp @@ -98,7 +98,6 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { EncodeInputStrings(vdata, count); } - static constexpr idx_t STRING_SIZE_LIMIT = 16384; for (idx_t i = 0; i < count; i++) { auto idx = vdata.sel->get_index(i); idx_t string_size = 0; @@ -110,12 +109,12 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { auto &str = string_data.Get(); string_size = str.GetSize(); if (!string_data.encoded_string) { - if (string_size >= STRING_SIZE_LIMIT / 2) { + if (string_size >= DictFSSTCompression::STRING_SIZE_LIMIT / 2) { // This string could potentially expand by 2x when encoded by FSST return false; } } else { - if (string_size >= STRING_SIZE_LIMIT) { + if (string_size >= DictFSSTCompression::STRING_SIZE_LIMIT) { throw FatalException("Encoded string expanded by more than 2x somehow!?"); } } diff --git a/src/storage/compression/dict_fsst/compression.cpp b/src/storage/compression/dict_fsst/compression.cpp index 2d1870596ac..8b3fbf00110 100644 --- a/src/storage/compression/dict_fsst/compression.cpp +++ b/src/storage/compression/dict_fsst/compression.cpp @@ -6,10 +6,9 @@ namespace duckdb { namespace dict_fsst { DictFSSTCompressionCompressState::DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, - const CompressionInfo &info) - : DictFSSTCompressionState(info), checkpoint_data(checkpoint_data_p), - function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICT_FSST)), - encoded_input(BufferAllocator::Get(checkpoint_data.GetDatabase())) { + unique_ptr &&analyze) + : CompressionState(analyze->info), checkpoint_data(checkpoint_data_p), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICT_FSST)) { CreateEmptySegment(checkpoint_data.GetRowGroup().start); } @@ -20,6 +19,295 @@ DictFSSTCompressionCompressState::~DictFSSTCompressionCompressState() { } } +static constexpr uint16_t DICTIONARY_HEADER_SIZE = sizeof(dict_fsst_compression_header_t); +static constexpr idx_t STRING_SIZE_LIMIT = 16384; +static constexpr uint16_t FSST_SYMBOL_TABLE_SIZE = sizeof(duckdb_fsst_decoder_t); + +static inline bool IsEncoded(DictionaryAppendState state) { + return state == DictionaryAppendState::ENCODED || state == DictionaryAppendState::ENCODED_ALL_UNIQUE; +} + +void DictFSSTCompressionCompressState::Flush() { + current_segment->count = tuple_count; + + // Reset the state + append_state = DictionaryAppendState::REGULAR; + string_lengths_width = 0; + uncompressed_dictionary_copy.Destroy(); + //! This should already be empty at this point, otherwise that means that strings are not encoded / not added to the + //! dictionary + D_ASSERT(dictionary_encoding_buffer.empty()); + current_string_map.clear(); + + string_lengths.clear(); + max_string_length = 0; + + dictionary_indices.clear(); + + all_unique = true; + auto fsst_encoder = reinterpret_cast(encoder); + duckdb_fsst_destroy(fsst_encoder); + encoder = nullptr; + symbol_table_size = DConstants::INVALID_INDEX; + + tuple_count = 0; + dict_count = 0; + + string_lengths.push_back(0); + dict_count++; +} + +bool RequiresHigherBitWidth(bitpacking_width_t bitwidth, uint32_t other) { + return other >= (1 << bitwidth); +} + +static inline bool AddLookup(DictFSSTCompressionCompressState &state, idx_t lookup, + const bool recalculate_indices_space) { + D_ASSERT(lookup != DConstants::INVALID_INDEX); + + //! This string exists in the dictionary + idx_t new_dictionary_indices_space = state.dictionary_indices_space; + if (recalculate_indices_space) { + new_dictionary_indices_space = + BitpackingPrimitives::GetRequiredSize(state.tuple_count + 1, state.dictionary_indices_width); + } + + idx_t required_space = 0; + required_space += sizeof(dict_fsst_compression_header_t); + required_space = AlignValue(required_space); + required_space += state.current_dictionary.size; + required_space = AlignValue(required_space); + required_space += new_dictionary_indices_space; + required_space = AlignValue(required_space); + required_space += state.string_lengths_space; + + idx_t available_space = state.info.GetBlockSize(); + if (state.append_state == DictionaryAppendState::REGULAR) { + available_space -= FSST_SYMBOL_TABLE_SIZE; + } + if (required_space > available_space) { + return false; + } + + state.all_unique = false; + // Exists in the dictionary, add it + state.dictionary_indices.push_back(lookup); + state.tuple_count++; + return true; +} + +template +static inline bool AddToDictionary(DictFSSTCompressionCompressState &state, const string_t &str, + const bool recalculate_indices_space) { + auto str_len = str.GetSize(); + if (APPEND_STATE == DictionaryAppendState::ENCODED) { + //! We delay encoding of new entries. + // Encoding can increase the size of the string by 2x max, so we prepare for this worst case scenario. + str_len *= 2; + } + + const bool requires_higher_strlen_bitwidth = RequiresHigherBitWidth(state.string_lengths_width, str_len); + const bool requires_higher_indices_bitwidth = + RequiresHigherBitWidth(state.dictionary_indices_width, state.dict_count + 1); + // We round the required size up to bitpacking group sizes anyways, so we only have to recalculate every 32 values + const bool recalculate_strlen_space = + ((state.dict_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; + + //! String Lengths + bitpacking_width_t new_string_lengths_width = state.string_lengths_width; + idx_t new_string_lengths_space = state.string_lengths_space; + if (requires_higher_strlen_bitwidth) { + new_string_lengths_width = BitpackingPrimitives::MinimumBitWidth(str_len); + } + if (requires_higher_strlen_bitwidth || recalculate_strlen_space) { + new_string_lengths_space = + BitpackingPrimitives::GetRequiredSize(state.dict_count + 1, new_string_lengths_width); + } + + //! Dictionary Indices + bitpacking_width_t new_dictionary_indices_width = state.dictionary_indices_width; + idx_t new_dictionary_indices_space = state.dictionary_indices_space; + if (requires_higher_indices_bitwidth) { + new_dictionary_indices_width = BitpackingPrimitives::MinimumBitWidth(state.dict_count + 1); + } + if (requires_higher_indices_bitwidth || recalculate_indices_space) { + new_dictionary_indices_space = + BitpackingPrimitives::GetRequiredSize(state.tuple_count + 1, new_dictionary_indices_width); + } + + idx_t required_space = 0; + required_space += sizeof(dict_fsst_compression_header_t); + required_space = AlignValue(required_space); + if (APPEND_STATE == DictionaryAppendState::ENCODED) { + required_space += state.current_dictionary.size + str_len; + for (auto &uncompressed_string : state.dictionary_encoding_buffer) { + required_space += uncompressed_string.GetSize() * 2; + } + } else { + required_space += state.current_dictionary.size + str_len; + } + required_space = AlignValue(required_space); + required_space += new_dictionary_indices_space; + required_space = AlignValue(required_space); + required_space += new_string_lengths_space; + + idx_t available_space = state.info.GetBlockSize(); + if (state.append_state == DictionaryAppendState::REGULAR) { + available_space -= FSST_SYMBOL_TABLE_SIZE; + } + if (required_space > available_space) { + return false; + } + + // Add it to the dictionary + state.dictionary_indices.push_back(state.dict_count); + if (APPEND_STATE == DictionaryAppendState::ENCODED) { + if (str.IsInlined()) { + state.dictionary_encoding_buffer.push_back(str); + } else { + state.dictionary_encoding_buffer.push_back(state.uncompressed_dictionary_copy.AddString(str)); + } + auto &uncompressed_string = state.dictionary_encoding_buffer.back(); + state.current_string_map[uncompressed_string] = state.dict_count; + } else { + state.string_lengths.push_back(str_len); + auto baseptr = + AlignPointer(state.current_handle.Ptr() + sizeof(dict_fsst_compression_header_t)); + memcpy(baseptr + state.dictionary_offset, str.GetData(), str_len); + state.dictionary_offset += str_len; + string_t dictionary_string((const char *)(baseptr + state.dictionary_offset), str_len); // NOLINT + state.current_string_map[dictionary_string] = state.dict_count; + } + state.dict_count++; + + //! Update the state for serializing the dictionary_indices + string_lengths + if (requires_higher_strlen_bitwidth) { + D_ASSERT(str_len > state.max_string_length); + state.string_lengths_width = new_string_lengths_width; + state.max_string_length = str_len; + } + if (requires_higher_strlen_bitwidth || recalculate_strlen_space) { + state.string_lengths_space = new_string_lengths_space; + } + + if (requires_higher_indices_bitwidth) { + state.dictionary_indices_width = new_dictionary_indices_width; + } + if (requires_higher_indices_bitwidth || recalculate_indices_space) { + state.dictionary_indices_space = new_dictionary_indices_space; + } + state.tuple_count++; + return true; +} + +void DictFSSTCompressionCompressState::Compress(Vector &scan_vector, idx_t count) { + UnifiedVectorFormat vector_format; + scan_vector.ToUnifiedFormat(count, vector_format); + auto strings = vector_format.GetData(vector_format); + //! If the append_mode is FSST_ONLY we will encode all input + // this memory is owned by a reusable buffer stored in the state + vector encoded_input; + //! The index at which we started encoding the input + // in case we switch to FSST_ONLY in the middle, we can avoid encoding the previous input strings + idx_t encoding_offset = 0; + + for (idx_t i = 0; i < count; i++) { + auto idx = vector_format.sel->get_index(i); + bool is_not_null = vector_format.validity.RowIsValid(idx); + + idx_t lookup = DConstants::INVALID_INDEX; + auto &str = strings[idx]; + + bool fits = false; + for (idx_t check = 0; check < 3 && !fits; check++) { + switch (append_state) { + case DictionaryAppendState::NOT_ENCODED: + case DictionaryAppendState::REGULAR: { + if (is_not_null) { + auto it = current_string_map.find(str); + lookup = it == current_string_map.end() ? DConstants::INVALID_INDEX : it->second; + } else { + lookup = 0; + } + + const bool recalculate_indices_space = + ((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; + if (lookup != DConstants::INVALID_INDEX) { + fits = AddLookup(*this, lookup, recalculate_indices_space); + } else { + //! This string does not exist in the dictionary, add it + auto &str = strings[idx]; + // FIXME: if/else to call with NOT_ENCODED if needed?? + fits = AddToDictionary(*this, str, recalculate_indices_space); + } + break; + } + case DictionaryAppendState::ENCODED: { + // Don't encode the input, the 'current_string_map' is not encoded. + // encoding of the dictionary is done lazily + // we optimize for the case where the strings are *already* in the dictionary + + if (is_not_null) { + auto it = current_string_map.find(str); + lookup = it == current_string_map.end() ? DConstants::INVALID_INDEX : it->second; + } else { + lookup = 0; + } + + const bool recalculate_indices_space = + ((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; + if (lookup != DConstants::INVALID_INDEX) { + fits = AddLookup(*this, lookup, recalculate_indices_space); + } else { + //! Not in the dictionary, add it + fits = AddToDictionary(*this, str, recalculate_indices_space); + } + + break; + } + case DictionaryAppendState::ENCODED_ALL_UNIQUE: { + // Encode the input upfront, the 'current_string_map' is also encoded. + // no lookups are performed, everything is added. + if (encoded_input.empty()) { + encoding_offset = i; + idx_t required_space = 0; + vector input_string_ptrs; + vector input_string_lengths; + for (idx_t j = i; j < count; j++) { + auto index = vector_format.sel->get_index(j); +#ifdef DEBUG + //! We only choose FSST_ONLY if the rowgroup doesn't contain any nulls + D_ASSERT(vector_format.validity.RowIsValid(index)); +#endif + auto &to_encode = strings[index]; + input_string_ptrs.push_back(to_encode.GetData()); + input_string_lengths.push_back(to_encode.GetSize()); + } + + // TODO: First encode the input + } + + const bool recalculate_indices_space = + ((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; + auto &string = encoded_input[i - encoding_offset]; + fits = AddToDictionary(*this, string, + recalculate_indices_space); + break; + } + } + } + if (!fits) { + throw InternalException("This should not happen, a possible infinite loop was prevented"); + } + } +} + +void DictFSSTCompressionCompressState::FinalizeCompress() { + throw NotImplementedException("TODO"); +} + +// ------------------ OLD ---------------------------------------------------------------------- + void DictFSSTCompressionCompressState::CreateEmptySegment(idx_t row_start) { auto &db = checkpoint_data.GetDatabase(); auto &type = checkpoint_data.GetType(); @@ -30,20 +318,19 @@ void DictFSSTCompressionCompressState::CreateEmptySegment(idx_t row_start) { // Reset the buffers and the string map. current_string_map.clear(); - dictionary_string_lengths.clear(); + string_lengths.clear(); // Reserve index 0 for null strings. - dictionary_string_lengths.push_back(0); - selection_buffer.clear(); - - current_width = 0; - next_width = 0; + string_lengths.push_back(0); + dict_count++; + dictionary_indices.clear(); // Reset the pointers into the current segment. auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase()); current_handle = buffer_manager.Pin(current_segment->block); - current_dictionary = DictFSSTCompression::GetDictionary(*current_segment, current_handle); - current_end_ptr = current_handle.Ptr() + current_dictionary.end; + + dictionary_offset = 0; + // current_dictionary = DictFSSTCompression::GetDictionary(*current_segment, current_handle); } void DictFSSTCompressionCompressState::Verify() {