From 7e704c1f8a54e080547707917e5b13c00ee341b5 Mon Sep 17 00:00:00 2001 From: Tishj Date: Wed, 8 Jan 2025 17:21:48 +0100 Subject: [PATCH] introduce StringData to hold the uncompressed string, together with an optional encoded string --- .../storage/compression/dict_fsst/analyze.hpp | 6 +-- .../storage/compression/dict_fsst/common.hpp | 26 ++++++++-- .../compression/dict_fsst/compression.hpp | 6 +-- src/storage/compression/dict_fsst/analyze.cpp | 11 ++-- src/storage/compression/dict_fsst/common.cpp | 17 ++++--- .../compression/dict_fsst/compression.cpp | 51 +++++++++++++------ 6 files changed, 82 insertions(+), 35 deletions(-) diff --git a/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp b/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp index 62c2ec5fd086..d3b9c95a0cb9 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/analyze.hpp @@ -16,15 +16,15 @@ struct DictFSSTAnalyzeState : public DictFSSTCompressionState { explicit DictFSSTAnalyzeState(const CompressionInfo &info); public: - optional_idx LookupString(string_t str) override; - void AddNewString(string_t str) override; + optional_idx LookupString(const string_t &str) override; + void AddNewString(const StringData &str) override; void AddLookup(uint32_t lookup_result) override; void AddNull() override; idx_t RequiredSpace(bool new_string, idx_t string_size) override; void Flush(bool final = false) override; void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override; bool EncodeDictionary() override; - const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override; + StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override; void Verify() override; public: diff --git a/src/include/duckdb/storage/compression/dict_fsst/common.hpp b/src/include/duckdb/storage/compression/dict_fsst/common.hpp index 56d2ad22e8d1..fd27d94a1eb6 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/common.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/common.hpp @@ -40,6 +40,25 @@ struct DictFSSTCompression { static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container); }; +struct StringData { +public: + explicit StringData(const string_t &string, optional_ptr encoded_string = nullptr) + : string(string), encoded_string(encoded_string) { + } + +public: + const string_t &Get() const { + if (encoded_string) { + return *encoded_string; + } + return string; + } + +public: + const string_t &string; + optional_ptr encoded_string; +}; + //! Abstract class managing the compression state for size analysis or compression. class DictFSSTCompressionState : public CompressionState { public: @@ -53,22 +72,23 @@ class DictFSSTCompressionState : public CompressionState { // Should verify the State virtual void Verify() = 0; // Performs a lookup of str, storing the result internally - virtual optional_idx LookupString(string_t str) = 0; + virtual optional_idx LookupString(const string_t &str) = 0; // Add the most recently looked up str to compression state virtual void AddLookup(uint32_t lookup_result) = 0; // Add string to the state that is known to not be seen yet - virtual void AddNewString(string_t str) = 0; + virtual void AddNewString(const StringData &str) = 0; // Add a null value to the compression state virtual void AddNull() = 0; virtual idx_t RequiredSpace(bool new_string, idx_t string_size) = 0; // Flush the segment to disk if compressing or reset the counters if analyzing virtual void Flush(bool final = false) = 0; + virtual void UpdateStats(UnifiedVectorFormat &input, idx_t count) {/* no-op */}; // Process the strings of the vector if necessary virtual void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) = 0; // Encode the dictionary with FSST, return false if we decided not to encode virtual bool EncodeDictionary() = 0; // Retrieve the string given the indices - virtual const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) = 0; + virtual StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) = 0; private: bool DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, idx_t index, idx_t raw_index); diff --git a/src/include/duckdb/storage/compression/dict_fsst/compression.hpp b/src/include/duckdb/storage/compression/dict_fsst/compression.hpp index 6b7b9c6313dd..44476fe57cda 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/compression.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/compression.hpp @@ -49,15 +49,15 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState { public: void CreateEmptySegment(idx_t row_start); void Verify() override; - optional_idx LookupString(string_t str) override; - void AddNewString(string_t str) override; + optional_idx LookupString(const string_t &str) override; + void AddNewString(const StringData &str) override; void AddNull() override; void AddLookup(uint32_t lookup_result) override; idx_t RequiredSpace(bool new_string, idx_t string_size) override; void Flush(bool final = false) override; void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override; bool EncodeDictionary() override; - const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override; + StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override; idx_t Finalize(); public: diff --git a/src/storage/compression/dict_fsst/analyze.cpp b/src/storage/compression/dict_fsst/analyze.cpp index 5c5d8d3cdbd5..964a71224780 100644 --- a/src/storage/compression/dict_fsst/analyze.cpp +++ b/src/storage/compression/dict_fsst/analyze.cpp @@ -8,11 +8,14 @@ DictFSSTAnalyzeState::DictFSSTAnalyzeState(const CompressionInfo &info) current_dict_size(0), current_width(0), next_width(0) { } -optional_idx DictFSSTAnalyzeState::LookupString(string_t str) { +optional_idx DictFSSTAnalyzeState::LookupString(const string_t &str) { return current_set.count(str) ? 1 : optional_idx(); } -void DictFSSTAnalyzeState::AddNewString(string_t str) { +void DictFSSTAnalyzeState::AddNewString(const StringData &string_data) { + D_ASSERT(!string_data.encoded_string); + auto &str = string_data.string; + current_tuple_count++; current_unique_count++; current_dict_size += str.GetSize(); @@ -40,8 +43,8 @@ bool DictFSSTAnalyzeState::EncodeDictionary() { return false; } -const string_t &DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { - return strings[index]; +StringData DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { + return StringData(strings[index]); } idx_t DictFSSTAnalyzeState::RequiredSpace(bool new_string, idx_t string_size) { diff --git a/src/storage/compression/dict_fsst/common.cpp b/src/storage/compression/dict_fsst/common.cpp index 4ea16dd1eabe..ee7f8c41b47a 100644 --- a/src/storage/compression/dict_fsst/common.cpp +++ b/src/storage/compression/dict_fsst/common.cpp @@ -48,8 +48,9 @@ DictFSSTCompressionState::~DictFSSTCompressionState() { bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, idx_t index, idx_t raw_index) { auto strings = vdata.GetData(vdata); - reference str = GetString(strings, index, raw_index); - auto required_space = RequiredSpace(is_new, str.get().GetSize()); + auto string_data = GetString(strings, index, raw_index); + auto &str = string_data.Get(); + auto required_space = RequiredSpace(is_new, str.GetSize()); auto block_size = info.GetBlockSize(); @@ -60,8 +61,8 @@ bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVec //! Decide whether or not to encode the dictionary if (EncodeDictionary()) { EncodeInputStrings(vdata, count); - str = GetString(strings, index, raw_index); - required_space = RequiredSpace(is_new, str.get().GetSize()); + auto encoded_string = GetString(strings, index, raw_index); + required_space = RequiredSpace(is_new, encoded_string.Get().GetSize()); if (required_space > block_size) { //! Even after encoding the dictionary, we can't add this string return false; @@ -97,9 +98,10 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { optional_idx lookup_result; auto row_is_valid = vdata.validity.RowIsValid(idx); - reference str = GetString(data, idx, i); + auto string_data = GetString(data, idx, i); if (row_is_valid) { - string_size = str.get().GetSize(); + auto &str = string_data.Get(); + string_size = str.GetSize(); if (string_size >= StringUncompressed::GetStringBlockLimit(info.GetBlockSize())) { // Big strings not implemented for dictionary compression return false; @@ -124,7 +126,8 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) { } else if (lookup_result.IsValid()) { AddLookup(UnsafeNumericCast(lookup_result.GetIndex())); } else { - AddNewString(str); + auto string = GetString(data, idx, i); + AddNewString(string); } Verify(); diff --git a/src/storage/compression/dict_fsst/compression.cpp b/src/storage/compression/dict_fsst/compression.cpp index d44008c3e8fe..5fb84c7d0d61 100644 --- a/src/storage/compression/dict_fsst/compression.cpp +++ b/src/storage/compression/dict_fsst/compression.cpp @@ -51,11 +51,16 @@ void DictFSSTCompressionCompressState::Verify() { D_ASSERT(current_segment->count == selection_buffer.size()); D_ASSERT(DictFSSTCompression::HasEnoughSpace(current_segment->count.load(), index_buffer.size(), current_dictionary.size, current_width, info.GetBlockSize())); - D_ASSERT(current_dictionary.end == info.GetBlockSize()); + +#ifdef DEBUG + if (append_state != DictionaryAppendState::ENCODED) { + D_ASSERT(current_dictionary.end == info.GetBlockSize()); + } +#endif D_ASSERT(index_buffer.size() == current_string_map.size() + 1); // +1 is for null value } -optional_idx DictFSSTCompressionCompressState::LookupString(string_t str) { +optional_idx DictFSSTCompressionCompressState::LookupString(const string_t &str) { auto search = current_string_map.find(str); auto has_result = search != current_string_map.end(); @@ -65,9 +70,11 @@ optional_idx DictFSSTCompressionCompressState::LookupString(string_t str) { return search->second; } -void DictFSSTCompressionCompressState::AddNewString(string_t str) { - UncompressedStringStorage::UpdateStringStats(current_segment->stats, str); +void DictFSSTCompressionCompressState::AddNewString(const StringData &string_data) { + //! Update the stats using the uncompressed string always! + UncompressedStringStorage::UpdateStringStats(current_segment->stats, string_data.string); + auto &str = append_state == DictionaryAppendState::ENCODED ? *string_data.encoded_string : string_data.string; // Copy string to dict // New entries are added to the start (growing backwards) // [............xxxxooooooooo] @@ -78,7 +85,11 @@ void DictFSSTCompressionCompressState::AddNewString(string_t str) { auto dict_pos = current_end_ptr - current_dictionary.size; memcpy(dict_pos, str.GetData(), str.GetSize()); current_dictionary.Verify(info.GetBlockSize()); - D_ASSERT(current_dictionary.end == info.GetBlockSize()); +#ifdef DEBUG + if (append_state != DictionaryAppendState::ENCODED) { + D_ASSERT(current_dictionary.end == info.GetBlockSize()); + } +#endif // Update buffers and map index_buffer.push_back(current_dictionary.size); @@ -170,7 +181,7 @@ void DictFSSTCompressionCompressState::EncodeInputStrings(UnifiedVectorFormat &i for (idx_t i = 0; i < count; i++) { uint32_t size = UnsafeNumericCast(compressed_sizes[i]); string_t encoded_string((const char *)compressed_ptrs[i], size); // NOLINT; - strings.push_back(heap.AddString(std::move(encoded_string))); + strings.push_back(heap.AddBlob(std::move(encoded_string))); } } @@ -186,7 +197,10 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() { data_ptr_t last_start = current_end_ptr; //! We could use the 'current_string_map' but this won't be in-order // and we want to preserve the order of the dictionary after rewriting - for (auto offset : index_buffer) { + + // Skip index 0, that's reserved for NULL + for (idx_t i = 1; i < index_buffer.size(); i++) { + auto offset = index_buffer[i]; auto start = current_end_ptr - offset; size_t size = UnsafeNumericCast(last_start - start); fsst_string_sizes.push_back(size); @@ -195,7 +209,7 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() { } // Create the encoder - auto string_count = index_buffer.size(); + auto string_count = index_buffer.size() - 1; encoder = duckdb_fsst_create(string_count, &fsst_string_sizes[0], &fsst_string_ptrs[0], 0); auto fsst_encoder = (duckdb_fsst_encoder_t *)(encoder); @@ -225,10 +239,11 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() { auto &start = compressed_ptrs[i]; auto &size = compressed_sizes[i]; offset += size; - index_buffer[i] = offset; + // Skip index 0, reserved for NULL + index_buffer[i + 1] = offset; auto dest = current_end_ptr - offset; memcpy(dest, start, size); - string_t dictionary_string((const char *)start, UnsafeNumericCast(size)); // NOLINT + string_t dictionary_string((const char *)dest, UnsafeNumericCast(size)); // NOLINT current_string_map.insert({dictionary_string, i}); } current_dictionary.size = offset; @@ -237,17 +252,23 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() { return true; } -const string_t &DictFSSTCompressionCompressState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { - if (append_state != DictionaryAppendState::ENCODED) { - return strings[index]; +StringData DictFSSTCompressionCompressState::GetString(const string_t *strings, idx_t index, idx_t raw_index) { + StringData result(strings[index]); + if (append_state == DictionaryAppendState::ENCODED) { + result.encoded_string = encoded_input.input_data[raw_index]; } - return encoded_input.input_data[raw_index]; + return result; } idx_t DictFSSTCompressionCompressState::Finalize() { auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase()); auto handle = buffer_manager.Pin(current_segment->block); - D_ASSERT(current_dictionary.end == info.GetBlockSize()); + +#ifdef DEBUG + if (append_state != DictionaryAppendState::ENCODED) { + D_ASSERT(current_dictionary.end == info.GetBlockSize()); + } +#endif const bool is_fsst_encoded = append_state == DictionaryAppendState::ENCODED; // calculate sizes