diff --git a/src/include/duckdb/storage/compression/dict_fsst/common.hpp b/src/include/duckdb/storage/compression/dict_fsst/common.hpp index 6c5d39e9a64f..fcaa70e74a1d 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/common.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/common.hpp @@ -19,12 +19,11 @@ enum class DictFSSTMode : uint8_t { typedef struct { uint32_t dict_size; - uint32_t dict_end; - uint32_t string_lengths_offset; - uint32_t string_lengths_width; uint32_t dict_count; - uint32_t dictionary_indices_width; DictFSSTMode mode; + uint8_t string_lengths_width; + uint8_t dictionary_indices_width; + uint32_t symbol_table_size; } dict_fsst_compression_header_t; enum class DictionaryAppendState : uint8_t { diff --git a/src/include/duckdb/storage/compression/dict_fsst/decompression.hpp b/src/include/duckdb/storage/compression/dict_fsst/decompression.hpp index 2a05fbe15807..e590a4fedd06 100644 --- a/src/include/duckdb/storage/compression/dict_fsst/decompression.hpp +++ b/src/include/duckdb/storage/compression/dict_fsst/decompression.hpp @@ -9,18 +9,19 @@ namespace dict_fsst { //===--------------------------------------------------------------------===// // Scan //===--------------------------------------------------------------------===// -// FIXME: why is this StringScanState when we also define: `BufferHandle handle` ??? -struct CompressedStringScanState : public StringScanState { +struct CompressedStringScanState : public SegmentScanState { public: - explicit CompressedStringScanState(BufferHandle &&handle_p) - : StringScanState(), owned_handle(std::move(handle_p)), handle(owned_handle) { + CompressedStringScanState(ColumnSegment &segment, BufferHandle &&handle_p) + : segment(segment), owned_handle(std::move(handle_p)), handle(owned_handle) { } - explicit CompressedStringScanState(BufferHandle &handle_p) : StringScanState(), owned_handle(), handle(handle_p) { + CompressedStringScanState(ColumnSegment &segment, BufferHandle &handle_p) + : segment(segment), owned_handle(), handle(handle_p) { } + ~CompressedStringScanState() override; public: - void Initialize(ColumnSegment &segment, bool initialize_dictionary = true); + void Initialize(bool initialize_dictionary = true); void ScanToFlatVector(Vector &result, idx_t result_offset, idx_t start, idx_t scan_count); void ScanToDictionaryVector(ColumnSegment &segment, Vector &result, idx_t result_offset, idx_t start, idx_t scan_count); @@ -31,27 +32,29 @@ struct CompressedStringScanState : public StringScanState { uint32_t GetStringLength(sel_t index); public: + ColumnSegment &segment; BufferHandle owned_handle; optional_ptr handle; - bitpacking_width_t current_width; + DictFSSTMode mode; + idx_t dictionary_size; + uint32_t dict_count; + bitpacking_width_t dictionary_indices_width; + bitpacking_width_t string_lengths_width; + buffer_ptr sel_vec; - vector string_lengths; idx_t sel_vec_size = 0; - DictFSSTMode mode; + + vector string_lengths; //! Start of the block (pointing to the dictionary_header) data_ptr_t baseptr; - //! Start of the data (pointing to the start of the selection buffer) - data_ptr_t base_data; + data_ptr_t dict_ptr; + data_ptr_t dictionary_indices_ptr; data_ptr_t string_lengths_ptr; - bitpacking_width_t string_lengths_width; - uint32_t dict_count; buffer_ptr dictionary; - idx_t dictionary_size; StringDictionaryContainer dict; - idx_t block_size; void *decoder = nullptr; vector decompress_buffer; diff --git a/src/storage/compression/dict_fsst.cpp b/src/storage/compression/dict_fsst.cpp index 94adfaa9a987..0d81e36ba752 100644 --- a/src/storage/compression/dict_fsst.cpp +++ b/src/storage/compression/dict_fsst.cpp @@ -107,8 +107,8 @@ void DictFSSTCompressionStorage::FinalizeCompress(CompressionState &state_p) { //===--------------------------------------------------------------------===// unique_ptr DictFSSTCompressionStorage::StringInitScan(ColumnSegment &segment) { auto &buffer_manager = BufferManager::GetBufferManager(segment.db); - auto state = make_uniq(buffer_manager.Pin(segment.block)); - state->Initialize(segment, true); + auto state = make_uniq(segment, buffer_manager.Pin(segment.block)); + state->Initialize(true); return std::move(state); } @@ -141,8 +141,8 @@ void DictFSSTCompressionStorage::StringScan(ColumnSegment &segment, ColumnScanSt void DictFSSTCompressionStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, idx_t result_idx) { // fetch a single row from the string segment - CompressedStringScanState scan_state(state.GetOrInsertHandle(segment)); - scan_state.Initialize(segment, false); + CompressedStringScanState scan_state(segment, state.GetOrInsertHandle(segment)); + scan_state.Initialize(false); scan_state.ScanToFlatVector(result, result_idx, NumericCast(row_id), 1); } diff --git a/src/storage/compression/dict_fsst/common.cpp b/src/storage/compression/dict_fsst/common.cpp index 43c14ee7b3d1..b68a3f510166 100644 --- a/src/storage/compression/dict_fsst/common.cpp +++ b/src/storage/compression/dict_fsst/common.cpp @@ -13,7 +13,7 @@ StringDictionaryContainer DictFSSTCompression::GetDictionary(ColumnSegment &segm auto header_ptr = reinterpret_cast(handle.Ptr() + segment.GetBlockOffset()); StringDictionaryContainer container; container.size = Load(data_ptr_cast(&header_ptr->dict_size)); - container.end = Load(data_ptr_cast(&header_ptr->dict_end)); + container.end = container.size; return container; } @@ -21,7 +21,6 @@ void DictFSSTCompression::SetDictionary(ColumnSegment &segment, BufferHandle &ha StringDictionaryContainer container) { auto header_ptr = reinterpret_cast(handle.Ptr() + segment.GetBlockOffset()); Store(container.size, data_ptr_cast(&header_ptr->dict_size)); - Store(container.end, data_ptr_cast(&header_ptr->dict_end)); } // bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, diff --git a/src/storage/compression/dict_fsst/compression.cpp b/src/storage/compression/dict_fsst/compression.cpp index 500bdc2a1ab3..65baa3d4ad6f 100644 --- a/src/storage/compression/dict_fsst/compression.cpp +++ b/src/storage/compression/dict_fsst/compression.cpp @@ -54,38 +54,41 @@ idx_t DictFSSTCompressionState::Finalize() { D_ASSERT(string_lengths_space == this->string_lengths_space); } #endif - auto total_size = DictFSSTCompression::DICTIONARY_HEADER_SIZE + dictionary_indices_space + string_lengths_space + - dictionary_offset; - if (is_fsst_encoded) { - total_size += symbol_table_size; + + if (!is_fsst_encoded) { + symbol_table_size = 0; } + const auto total_size = DictFSSTCompression::DICTIONARY_HEADER_SIZE + dictionary_indices_space + + string_lengths_space + dictionary_offset + symbol_table_size; // calculate ptr and offsets auto base_ptr = current_handle.Ptr(); auto header_ptr = reinterpret_cast(base_ptr); - auto offset_to_dictionary = AlignValue(DictFSSTCompression::DICTIONARY_HEADER_SIZE); - auto offset_to_dictionary_indices = offset_to_dictionary + dictionary_offset; + auto dictionary_dest = AlignValue(DictFSSTCompression::DICTIONARY_HEADER_SIZE); + auto symbol_table_dest = AlignValue(dictionary_dest + dictionary_offset); + auto string_lengths_dest = AlignValue(symbol_table_dest + symbol_table_size); + auto dictionary_indices_dest = AlignValue(string_lengths_dest + string_lengths_space); + + header_ptr->mode = ConvertToMode(append_state); + header_ptr->symbol_table_size = symbol_table_size; + header_ptr->dict_size = dictionary_offset; + header_ptr->dict_count = dict_count; + header_ptr->dictionary_indices_width = dictionary_indices_width; + header_ptr->string_lengths_width = string_lengths_width; + + // Write the symbol table if (is_fsst_encoded) { D_ASSERT(symbol_table_size != DConstants::INVALID_INDEX); - memcpy(base_ptr + offset_to_dictionary_indices, fsst_serialized_symbol_table.get(), symbol_table_size); - offset_to_dictionary_indices += symbol_table_size; + memcpy(base_ptr + symbol_table_dest, fsst_serialized_symbol_table.get(), symbol_table_size); } - offset_to_dictionary_indices = AlignValue(offset_to_dictionary_indices); - auto string_lengths_offset = offset_to_dictionary_indices + dictionary_indices_space; - header_ptr->mode = ConvertToMode(append_state); - // Write compressed selection buffer - BitpackingPrimitives::PackBuffer(base_ptr + offset_to_dictionary_indices, + // Write the string lengths of the dictionary + BitpackingPrimitives::PackBuffer(base_ptr + string_lengths_dest, string_lengths.data(), dict_count, + string_lengths_width); + // Write the dictionary indices (selection vector) + BitpackingPrimitives::PackBuffer(base_ptr + dictionary_indices_dest, (sel_t *)(dictionary_indices.data()), tuple_count, dictionary_indices_width); - BitpackingPrimitives::PackBuffer(base_ptr + string_lengths_offset, string_lengths.data(), - dict_count, string_lengths_width); - - // Store sizes and offsets in segment header - Store(NumericCast(string_lengths_offset), data_ptr_cast(&header_ptr->string_lengths_offset)); - Store(NumericCast(dict_count), data_ptr_cast(&header_ptr->dict_count)); - Store((uint32_t)dictionary_indices_width, data_ptr_cast(&header_ptr->dictionary_indices_width)); - Store((uint32_t)string_lengths_width, data_ptr_cast(&header_ptr->string_lengths_width)); if (append_state != DictionaryAppendState::ENCODED_ALL_UNIQUE) { D_ASSERT(dictionary_indices_width == BitpackingPrimitives::MinimumBitWidth(dict_count - 1)); @@ -107,9 +110,6 @@ idx_t DictFSSTCompressionState::Finalize() { D_ASSERT(info.GetBlockSize() >= required_space); #endif D_ASSERT((uint64_t)*max_element(std::begin(dictionary_indices), std::end(dictionary_indices)) == dict_count - 1); - - // Write the new dictionary with the updated "end". - DictFSSTCompression::SetDictionary(*current_segment, current_handle, current_dictionary); return total_size; } @@ -189,6 +189,10 @@ void DictFSSTCompressionState::Flush(bool final) { FlushEncodingBuffer(); } + if (!tuple_count) { + return; + } + current_segment->count = tuple_count; current_dictionary.size = dictionary_offset; current_dictionary.end = dictionary_offset; @@ -391,6 +395,11 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form idx_t lookup = DConstants::INVALID_INDEX; auto &str = strings[idx]; + //! In GetRequiredSize we will round up to ALGORITHM_GROUP_SIZE anyways + // so we can avoid recalculating for every tuple + const bool recalculate_indices_space = ((tuple_count % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 0); + // const bool recalculate_indices_space = true; + switch (append_state) { case DictionaryAppendState::NOT_ENCODED: case DictionaryAppendState::REGULAR: { @@ -401,8 +410,6 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form lookup = 0; } - const bool recalculate_indices_space = - ((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; if (append_state == DictionaryAppendState::REGULAR) { if (lookup != DConstants::INVALID_INDEX) { return AddLookup(*this, lookup, recalculate_indices_space); @@ -433,8 +440,6 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form lookup = 0; } - const bool recalculate_indices_space = - ((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; bool fits; if (lookup != DConstants::INVALID_INDEX) { fits = AddLookup(*this, lookup, recalculate_indices_space); @@ -486,6 +491,13 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form } // FIXME: can we compress directly to the segment? that would save a copy + // I think yes? + // We can give the segment as destination, and limit the size + // it will tell us when it can't fit everything + // worst case we can just check if the rest of the metadata fits when we remove the last string that it was + // able to encode I believe 'duckdb_fsst_compress' tells us how many of the input strings it was able to + // compress We can work backwards from there to see how many strings actually fit (probably worst case ret-1 + // ??) auto fsst_encoder = reinterpret_cast(encoder); auto res = duckdb_fsst_compress(fsst_encoder, input_string_lengths.size(), input_string_lengths.data(), input_string_ptrs.data(), output_buffer_size, encoding_buffer.get(), @@ -501,8 +513,6 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form } } - const bool recalculate_indices_space = - ((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1; auto &string = encoded_input.data[i - encoded_input.offset]; return AddToDictionary(*this, string, recalculate_indices_space); } @@ -638,7 +648,7 @@ void DictFSSTCompressionState::Compress(Vector &scan_vector, idx_t count) { } void DictFSSTCompressionState::FinalizeCompress() { - throw NotImplementedException("TODO"); + Flush(true); } } // namespace dict_fsst diff --git a/src/storage/compression/dict_fsst/decompression.cpp b/src/storage/compression/dict_fsst/decompression.cpp index 550868b86f23..61cccf4fdd13 100644 --- a/src/storage/compression/dict_fsst/decompression.cpp +++ b/src/storage/compression/dict_fsst/decompression.cpp @@ -14,14 +14,13 @@ uint32_t CompressedStringScanState::GetStringLength(sel_t index) { } string_t CompressedStringScanState::FetchStringFromDict(Vector &result, int32_t dict_offset, uint32_t string_len) { - D_ASSERT(dict_offset >= 0 && dict_offset <= NumericCast(block_size)); + D_ASSERT(dict_offset >= 0 && dict_offset <= NumericCast(segment.GetBlockManager().GetBlockSize())); if (dict_offset == 0) { return string_t(nullptr, 0); } // normal string: read string from this block - auto dict_end = baseptr + dict.end; - auto dict_pos = dict_end - dict_offset; + auto dict_pos = dict_ptr + dict_offset; auto str_ptr = char_ptr_cast(dict_pos); switch (mode) { @@ -34,29 +33,11 @@ string_t CompressedStringScanState::FetchStringFromDict(Vector &result, int32_t } } -void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initialize_dictionary) { +void CompressedStringScanState::Initialize(bool initialize_dictionary) { baseptr = handle->Ptr() + segment.GetBlockOffset(); // Load header values auto header_ptr = reinterpret_cast(baseptr); - auto string_lengths_offset = Load(data_ptr_cast(&header_ptr->string_lengths_offset)); - dict_count = Load(data_ptr_cast(&header_ptr->dict_count)); - current_width = (bitpacking_width_t)(Load(data_ptr_cast(&header_ptr->dictionary_indices_width))); - string_lengths_width = (bitpacking_width_t)(Load(data_ptr_cast(&header_ptr->string_lengths_width))); - string_lengths.resize(AlignValue(dict_count)); - auto string_lengths_size = BitpackingPrimitives::GetRequiredSize(dict_count, string_lengths_width); - - if (segment.GetBlockOffset() + string_lengths_offset + string_lengths_size + header_ptr->dict_size > - segment.GetBlockManager().GetBlockSize()) { - throw IOException( - "Failed to scan dictionary string - index was out of range. Database file appears to be corrupted."); - } - string_lengths_ptr = baseptr + string_lengths_offset; - base_data = data_ptr_cast(baseptr + sizeof(dict_fsst_compression_header_t)); - - block_size = segment.GetBlockManager().GetBlockSize(); - - dict = DictFSSTCompression::GetDictionary(segment, *handle); mode = header_ptr->mode; if (mode >= DictFSSTMode::COUNT) { throw FatalException("This block was written with a mode that is not recognized by this version, highest " @@ -64,12 +45,37 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali static_cast(DictFSSTMode::COUNT), static_cast(mode)); } + dict_count = header_ptr->dict_count; + auto symbol_table_size = header_ptr->symbol_table_size; + dict = DictFSSTCompression::GetDictionary(segment, *handle); + + dictionary_indices_width = + (bitpacking_width_t)(Load(data_ptr_cast(&header_ptr->dictionary_indices_width))); + string_lengths_width = (bitpacking_width_t)(Load(data_ptr_cast(&header_ptr->string_lengths_width))); + + auto string_lengths_space = BitpackingPrimitives::GetRequiredSize(dict_count, string_lengths_width); + auto dictionary_indices_space = + BitpackingPrimitives::GetRequiredSize(segment.count.load(), dictionary_indices_width); + + auto dictionary_dest = AlignValue(DictFSSTCompression::DICTIONARY_HEADER_SIZE); + auto symbol_table_dest = AlignValue(dictionary_dest + dict.size); + auto string_lengths_dest = AlignValue(symbol_table_dest + symbol_table_size); + auto dictionary_indices_dest = AlignValue(string_lengths_dest + string_lengths_space); + + const auto total_space = segment.GetBlockOffset() + dictionary_indices_dest + dictionary_indices_space; + if (total_space > segment.GetBlockManager().GetBlockSize()) { + throw IOException( + "Failed to scan dictionary string - index was out of range. Database file appears to be corrupted."); + } + dict_ptr = data_ptr_cast(baseptr + dictionary_dest); + dictionary_indices_ptr = data_ptr_cast(baseptr + dictionary_indices_dest); + string_lengths_ptr = data_ptr_cast(baseptr + string_lengths_dest); + switch (mode) { case DictFSSTMode::FSST_ONLY: case DictFSSTMode::DICT_FSST: { decoder = new duckdb_fsst_decoder_t; - auto symbol_table_location = baseptr + dict.end; - auto ret = duckdb_fsst_import(reinterpret_cast(decoder), symbol_table_location); + auto ret = duckdb_fsst_import(reinterpret_cast(decoder), baseptr + symbol_table_dest); (void)(ret); D_ASSERT(ret != 0); // FIXME: the old code set the decoder to nullptr instead, why??? @@ -80,6 +86,8 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali default: break; } + + string_lengths.resize(AlignValue(dict_count)); BitpackingPrimitives::UnPackBuffer(data_ptr_cast(string_lengths.data()), data_ptr_cast(string_lengths_ptr), dict_count, string_lengths_width); @@ -120,10 +128,10 @@ const SelectionVector &CompressedStringScanState::GetSelVec(idx_t start, idx_t s sel_vec = make_buffer(decompress_count); } - data_ptr_t sel_buf_src = &base_data[((start - start_offset) * current_width) / 8]; + data_ptr_t sel_buf_src = &dictionary_indices_ptr[((start - start_offset) * dictionary_indices_width) / 8]; sel_t *sel_vec_ptr = sel_vec->data(); BitpackingPrimitives::UnPackBuffer(data_ptr_cast(sel_vec_ptr), sel_buf_src, decompress_count, - current_width); + dictionary_indices_width); return *sel_vec; } }