diff --git a/src/function/compression_config.cpp b/src/function/compression_config.cpp index c0a7f1609972..d248f0bfd79f 100644 --- a/src/function/compression_config.cpp +++ b/src/function/compression_config.cpp @@ -101,4 +101,8 @@ optional_ptr DBConfig::GetCompressionFunction(CompressionTy return LoadCompressionFunction(*compression_functions, type, physical_type); } +CompressionFunction &DBConfig::GetEmptyValidity() { + return *empty_validity; +} + } // namespace duckdb diff --git a/src/include/duckdb/function/compression_function.hpp b/src/include/duckdb/function/compression_function.hpp index 9199f45ac341..0c36f586284d 100644 --- a/src/include/duckdb/function/compression_function.hpp +++ b/src/include/duckdb/function/compression_function.hpp @@ -203,6 +203,8 @@ typedef unique_ptr (*compression_deserialize_state_t)(Deseri //! Function prototype for cleaning up the segment state when the column data is dropped typedef void (*compression_cleanup_state_t)(ColumnSegment &segment); +enum class CompressionValidity : uint8_t { REQUIRES_VALIDITY, NO_VALIDITY_REQUIRED }; + class CompressionFunction { public: CompressionFunction(CompressionType type, PhysicalType data_type, compression_init_analyze_t init_analyze, @@ -297,6 +299,10 @@ class CompressionFunction { compression_deserialize_state_t deserialize_state; //! Cleanup the segment state (optional) compression_cleanup_state_t cleanup_state; + + //! Whether the validity mask should be separately compressed + //! or this compression function can also be used to decompress the validity + CompressionValidity validity = CompressionValidity::REQUIRES_VALIDITY; }; //! The set of compression functions diff --git a/src/include/duckdb/main/config.hpp b/src/include/duckdb/main/config.hpp index 604ef264b1e1..c1ba3c306e7b 100644 --- a/src/include/duckdb/main/config.hpp +++ b/src/include/duckdb/main/config.hpp @@ -33,6 +33,7 @@ #include "duckdb/parser/parser_extension.hpp" #include "duckdb/planner/operator_extension.hpp" #include "duckdb/storage/compression/bitpacking.hpp" +#include "duckdb/function/compression_function.hpp" #include "duckdb/function/encoding_function.hpp" namespace duckdb { @@ -362,6 +363,7 @@ struct DBConfig { //! Returns the compression function matching the compression and physical type. DUCKDB_API optional_ptr GetCompressionFunction(CompressionType type, const PhysicalType physical_type); + DUCKDB_API CompressionFunction &GetEmptyValidity(); //! Returns the encode function matching the encoding name. DUCKDB_API optional_ptr GetEncodeFunction(const string &name) const; @@ -403,6 +405,7 @@ struct DBConfig { string SanitizeAllowedPath(const string &path) const; private: + unique_ptr empty_validity; unique_ptr compression_functions; unique_ptr encoding_functions; unique_ptr cast_functions; diff --git a/src/include/duckdb/storage/compression/empty_validity.hpp b/src/include/duckdb/storage/compression/empty_validity.hpp new file mode 100644 index 000000000000..f7e70a59a85a --- /dev/null +++ b/src/include/duckdb/storage/compression/empty_validity.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include "duckdb/function/compression_function.hpp" +#include "duckdb/storage/table/column_data.hpp" +#include "duckdb/storage/table/scan_state.hpp" +#include "duckdb/storage/table/column_data_checkpointer.hpp" + +namespace duckdb { + +class EmptyValidityCompression { +public: + struct EmptyValidityAnalyzeState : public AnalyzeState { + explicit EmptyValidityAnalyzeState(const CompressionInfo &info) : AnalyzeState(info) { + } + idx_t count = 0; + }; + struct EmptyValidityCompressionState : public CompressionState { + explicit EmptyValidityCompressionState(const CompressionInfo &info) : CompressionState(info) { + } + }; + struct EmptyValiditySegmentScanState : public SegmentScanState { + EmptyValiditySegmentScanState() { + } + }; + +public: + static unique_ptr CreateFunction() { + return make_uniq(CompressionType::COMPRESSION_AUTO, PhysicalType::BIT, InitAnalyze, + Analyze, FinalAnalyze, InitCompression, Compress, FinalizeCompress, + InitScan, Scan, ScanPartial, FetchRow, Skip, InitSegment); + } + +public: + static unique_ptr InitAnalyze(ColumnData &col_data, PhysicalType type) { + CompressionInfo info(col_data.GetBlockManager().GetBlockSize()); + return make_uniq(info); + } + static bool Analyze(AnalyzeState &state_p, Vector &input, idx_t count) { + auto &state = state_p.Cast(); + state.count += count; + return true; + } + static idx_t FinalAnalyze(AnalyzeState &state_p) { + return 0; + } + static unique_ptr InitCompression(ColumnDataCheckpointer &checkpointer, + unique_ptr state_p) { + auto res = make_uniq(state_p->info); + auto &state = state_p->Cast(); + + auto &db = checkpointer.GetDatabase(); + auto &type = checkpointer.GetType(); + + auto function = CreateFunction(); + auto &info = state.info; + auto compressed_segment = + ColumnSegment::CreateTransientSegment(db, *function, type, 0, info.GetBlockSize(), info.GetBlockSize()); + compressed_segment->count = state.count; + + auto &buffer_manager = BufferManager::GetBufferManager(checkpointer.GetDatabase()); + auto handle = buffer_manager.Pin(compressed_segment->block); + + auto &checkpointer_state = checkpointer.GetCheckpointState(); + checkpointer_state.FlushSegment(std::move(compressed_segment), std::move(handle), 0); + + return res; + } + static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { + return; + } + static void FinalizeCompress(CompressionState &state_p) { + return; + } + static unique_ptr InitScan(ColumnSegment &segment) { + return make_uniq(); + } + static void ScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, + idx_t result_offset) { + return; + } + static void Scan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { + return; + } + static void FetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result, + idx_t result_idx) { + return; + } + static void Skip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) { + return; + } + static unique_ptr InitSegment(ColumnSegment &segment, block_id_t block_id, + optional_ptr segment_state) { + return nullptr; + } +}; + +} // namespace duckdb diff --git a/src/include/duckdb/storage/table/column_data.hpp b/src/include/duckdb/storage/table/column_data.hpp index cc40d857cb2f..a4ca6d848ec9 100644 --- a/src/include/duckdb/storage/table/column_data.hpp +++ b/src/include/duckdb/storage/table/column_data.hpp @@ -81,6 +81,26 @@ class ColumnData { idx_t GetAllocationSize() const { return allocation_size; } + bool HasCompressionFunction() const { + return compression != nullptr; + } + const CompressionFunction &GetCompressionFunction() const { + D_ASSERT(HasCompressionFunction()); + return *compression; + } + + bool IsEmptyValidity() const { + if (type.id() != LogicalTypeId::VALIDITY) { + return false; + } + if (!parent->compression) { + return false; + } + if (parent->compression->validity != CompressionValidity::NO_VALIDITY_REQUIRED) { + return false; + } + return true; + } virtual void SetStart(idx_t new_start); //! The root type of the column diff --git a/src/main/database.cpp b/src/main/database.cpp index de60b1cf2948..ad6feb5509da 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -26,6 +26,7 @@ #include "duckdb/storage/storage_manager.hpp" #include "duckdb/transaction/transaction_manager.hpp" #include "duckdb/main/capi/extension_api.hpp" +#include "duckdb/storage/compression/empty_validity.hpp" #ifndef DUCKDB_NO_THREADS #include "duckdb/common/thread.hpp" @@ -34,6 +35,7 @@ namespace duckdb { DBConfig::DBConfig() { + empty_validity = EmptyValidityCompression::CreateFunction(); compression_functions = make_uniq(); encoding_functions = make_uniq(); encoding_functions->Initialize(*this); diff --git a/src/storage/compression/dictionary/decompression.cpp b/src/storage/compression/dictionary/decompression.cpp index a81c0e1221ee..dd1031328a63 100644 --- a/src/storage/compression/dictionary/decompression.cpp +++ b/src/storage/compression/dictionary/decompression.cpp @@ -43,15 +43,18 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali block_size = segment.GetBlockManager().GetBlockSize(); dict = DictionaryCompression::GetDictionary(segment, *handle); - dictionary = make_buffer(segment.type, index_buffer_count); - dictionary_size = index_buffer_count; if (!initialize_dictionary) { // Used by fetch, as fetch will never produce a DictionaryVector return; } + dictionary = make_buffer(segment.type, index_buffer_count); + dictionary_size = index_buffer_count; auto dict_child_data = FlatVector::GetData(*(dictionary)); + auto &validity = FlatVector::Validity(*dictionary); + D_ASSERT(index_buffer_count >= 1); + validity.SetInvalid(0); for (uint32_t i = 0; i < index_buffer_count; i++) { // NOTE: the passing of dict_child_vector, will not be used, its for big strings uint16_t str_len = GetStringLength(i); @@ -61,6 +64,7 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali void CompressedStringScanState::ScanToFlatVector(Vector &result, idx_t result_offset, idx_t start, idx_t scan_count) { auto result_data = FlatVector::GetData(result); + auto &validity = FlatVector::Validity(result); // Handling non-bitpacking-group-aligned start values; idx_t start_offset = start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; @@ -82,6 +86,9 @@ void CompressedStringScanState::ScanToFlatVector(Vector &result, idx_t result_of for (idx_t i = 0; i < scan_count; i++) { // Lookup dict offset in index buffer auto string_number = sel_vec->get_index(i + start_offset); + if (string_number == 0) { + validity.SetInvalid(result_offset + i); + } auto dict_offset = index_buffer_ptr[string_number]; auto str_len = GetStringLength(UnsafeNumericCast(string_number)); result_data[result_offset + i] = FetchStringFromDict(UnsafeNumericCast(dict_offset), str_len); @@ -109,6 +116,8 @@ void CompressedStringScanState::ScanToDictionaryVector(ColumnSegment &segment, V BitpackingPrimitives::UnPackBuffer(dst, src, scan_count, current_width); result.Dictionary(*(dictionary), dictionary_size, *sel_vec, scan_count); + // FIXME: this assumes the type is VectorType::DICTIONARY + // this could fail if the result is a ConstantVector instead. DictionaryVector::SetDictionaryId(result, to_string(CastPointerToValue(&segment))); } diff --git a/src/storage/compression/dictionary_compression.cpp b/src/storage/compression/dictionary_compression.cpp index 1ea1bb326184..d45aae892fbe 100644 --- a/src/storage/compression/dictionary_compression.cpp +++ b/src/storage/compression/dictionary_compression.cpp @@ -157,7 +157,7 @@ void DictionaryCompressionStorage::StringFetchRow(ColumnSegment &segment, Column // Get Function //===--------------------------------------------------------------------===// CompressionFunction DictionaryCompressionFun::GetFunction(PhysicalType data_type) { - return CompressionFunction( + auto res = CompressionFunction( CompressionType::COMPRESSION_DICTIONARY, data_type, DictionaryCompressionStorage ::StringInitAnalyze, DictionaryCompressionStorage::StringAnalyze, DictionaryCompressionStorage::StringFinalAnalyze, DictionaryCompressionStorage::InitCompression, DictionaryCompressionStorage::Compress, @@ -165,6 +165,8 @@ CompressionFunction DictionaryCompressionFun::GetFunction(PhysicalType data_type DictionaryCompressionStorage::StringScan, DictionaryCompressionStorage::StringScanPartial, DictionaryCompressionStorage::StringFetchRow, UncompressedFunctions::EmptySkip, UncompressedStringStorage::StringInitSegment); + res.validity = CompressionValidity::NO_VALIDITY_REQUIRED; + return res; } bool DictionaryCompressionFun::TypeIsSupported(const PhysicalType physical_type) { diff --git a/src/storage/table/column_data_checkpointer.cpp b/src/storage/table/column_data_checkpointer.cpp index fec8b3543dfd..37ebd6077ea7 100644 --- a/src/storage/table/column_data_checkpointer.cpp +++ b/src/storage/table/column_data_checkpointer.cpp @@ -1,6 +1,7 @@ #include "duckdb/storage/table/column_data_checkpointer.hpp" #include "duckdb/main/config.hpp" #include "duckdb/storage/table/update_segment.hpp" +#include "duckdb/storage/compression/empty_validity.hpp" #include "duckdb/storage/data_table.hpp" #include "duckdb/parser/column_definition.hpp" #include "duckdb/storage/table/scan_state.hpp" @@ -15,9 +16,13 @@ ColumnDataCheckpointer::ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup checkpoint_info(checkpoint_info_p) { auto &config = DBConfig::GetConfig(GetDatabase()); - auto functions = config.GetCompressionFunctions(GetType().InternalType()); - for (auto &func : functions) { - compression_functions.push_back(&func.get()); + if (is_validity && col_data_p.IsEmptyValidity()) { + compression_functions.push_back(config.GetEmptyValidity()); + } else { + auto functions = config.GetCompressionFunctions(GetType().InternalType()); + for (auto &func : functions) { + compression_functions.push_back(&func.get()); + } } } diff --git a/src/storage/table/column_segment.cpp b/src/storage/table/column_segment.cpp index 491096b7f2e4..39c58f8b8d38 100644 --- a/src/storage/table/column_segment.cpp +++ b/src/storage/table/column_segment.cpp @@ -35,6 +35,10 @@ unique_ptr ColumnSegment::CreatePersistentSegment(DatabaseInstanc if (block_id == INVALID_BLOCK) { function = config.GetCompressionFunction(CompressionType::COMPRESSION_CONSTANT, type.InternalType()); + } else if (type.id() == LogicalTypeId::VALIDITY && compression_type == CompressionType::COMPRESSION_AUTO) { + // The validity is not actually stored in this block, this is just a dummy + function = config.GetEmptyValidity(); + block = block_manager.RegisterBlock(block_id); } else { function = config.GetCompressionFunction(compression_type, type.InternalType()); block = block_manager.RegisterBlock(block_id);