Skip to content

Commit

Permalink
VERY MUCH WIP: write empty validity. it works, but it's for sure not …
Browse files Browse the repository at this point in the history
…what we want to do
  • Loading branch information
Tishj committed Dec 14, 2024
1 parent 55219d8 commit e4db440
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/function/compression_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,8 @@ optional_ptr<CompressionFunction> DBConfig::GetCompressionFunction(CompressionTy
return LoadCompressionFunction(*compression_functions, type, physical_type);
}

CompressionFunction &DBConfig::GetEmptyValidity() {
return *empty_validity;
}

} // namespace duckdb
6 changes: 6 additions & 0 deletions src/include/duckdb/function/compression_function.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ typedef unique_ptr<ColumnSegmentState> (*compression_deserialize_state_t)(Deseri
//! Function prototype for cleaning up the segment state when the column data is dropped
typedef void (*compression_cleanup_state_t)(ColumnSegment &segment);

enum class CompressionValidity : uint8_t { REQUIRES_VALIDITY, NO_VALIDITY_REQUIRED };

class CompressionFunction {
public:
CompressionFunction(CompressionType type, PhysicalType data_type, compression_init_analyze_t init_analyze,
Expand Down Expand Up @@ -297,6 +299,10 @@ class CompressionFunction {
compression_deserialize_state_t deserialize_state;
//! Cleanup the segment state (optional)
compression_cleanup_state_t cleanup_state;

//! Whether the validity mask should be separately compressed
//! or this compression function can also be used to decompress the validity
CompressionValidity validity = CompressionValidity::REQUIRES_VALIDITY;
};

//! The set of compression functions
Expand Down
3 changes: 3 additions & 0 deletions src/include/duckdb/main/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "duckdb/parser/parser_extension.hpp"
#include "duckdb/planner/operator_extension.hpp"
#include "duckdb/storage/compression/bitpacking.hpp"
#include "duckdb/function/compression_function.hpp"
#include "duckdb/function/encoding_function.hpp"

namespace duckdb {
Expand Down Expand Up @@ -362,6 +363,7 @@ struct DBConfig {
//! Returns the compression function matching the compression and physical type.
DUCKDB_API optional_ptr<CompressionFunction> GetCompressionFunction(CompressionType type,
const PhysicalType physical_type);
DUCKDB_API CompressionFunction &GetEmptyValidity();

//! Returns the encode function matching the encoding name.
DUCKDB_API optional_ptr<EncodingFunction> GetEncodeFunction(const string &name) const;
Expand Down Expand Up @@ -403,6 +405,7 @@ struct DBConfig {
string SanitizeAllowedPath(const string &path) const;

private:
unique_ptr<CompressionFunction> empty_validity;
unique_ptr<CompressionFunctionSet> compression_functions;
unique_ptr<EncodingFunctionSet> encoding_functions;
unique_ptr<CastFunctionSet> cast_functions;
Expand Down
97 changes: 97 additions & 0 deletions src/include/duckdb/storage/compression/empty_validity.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include "duckdb/function/compression_function.hpp"
#include "duckdb/storage/table/column_data.hpp"
#include "duckdb/storage/table/scan_state.hpp"
#include "duckdb/storage/table/column_data_checkpointer.hpp"

namespace duckdb {

class EmptyValidityCompression {
public:
struct EmptyValidityAnalyzeState : public AnalyzeState {
explicit EmptyValidityAnalyzeState(const CompressionInfo &info) : AnalyzeState(info) {
}
idx_t count = 0;
};
struct EmptyValidityCompressionState : public CompressionState {
explicit EmptyValidityCompressionState(const CompressionInfo &info) : CompressionState(info) {
}
};
struct EmptyValiditySegmentScanState : public SegmentScanState {
EmptyValiditySegmentScanState() {
}
};

public:
static unique_ptr<CompressionFunction> CreateFunction() {
return make_uniq<CompressionFunction>(CompressionType::COMPRESSION_AUTO, PhysicalType::BIT, InitAnalyze,
Analyze, FinalAnalyze, InitCompression, Compress, FinalizeCompress,
InitScan, Scan, ScanPartial, FetchRow, Skip, InitSegment);
}

public:
static unique_ptr<AnalyzeState> InitAnalyze(ColumnData &col_data, PhysicalType type) {
CompressionInfo info(col_data.GetBlockManager().GetBlockSize());
return make_uniq<EmptyValidityAnalyzeState>(info);
}
static bool Analyze(AnalyzeState &state_p, Vector &input, idx_t count) {
auto &state = state_p.Cast<EmptyValidityAnalyzeState>();
state.count += count;
return true;
}
static idx_t FinalAnalyze(AnalyzeState &state_p) {
return 0;
}
static unique_ptr<CompressionState> InitCompression(ColumnDataCheckpointer &checkpointer,
unique_ptr<AnalyzeState> state_p) {
auto res = make_uniq<EmptyValidityCompressionState>(state_p->info);
auto &state = state_p->Cast<EmptyValidityAnalyzeState>();

auto &db = checkpointer.GetDatabase();
auto &type = checkpointer.GetType();

auto function = CreateFunction();
auto &info = state.info;
auto compressed_segment =
ColumnSegment::CreateTransientSegment(db, *function, type, 0, info.GetBlockSize(), info.GetBlockSize());
compressed_segment->count = state.count;

auto &buffer_manager = BufferManager::GetBufferManager(checkpointer.GetDatabase());
auto handle = buffer_manager.Pin(compressed_segment->block);

auto &checkpointer_state = checkpointer.GetCheckpointState();
checkpointer_state.FlushSegment(std::move(compressed_segment), std::move(handle), 0);

return res;
}
static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) {
return;
}
static void FinalizeCompress(CompressionState &state_p) {
return;
}
static unique_ptr<SegmentScanState> InitScan(ColumnSegment &segment) {
return make_uniq<EmptyValiditySegmentScanState>();
}
static void ScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result,
idx_t result_offset) {
return;
}
static void Scan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) {
return;
}
static void FetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, Vector &result,
idx_t result_idx) {
return;
}
static void Skip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) {
return;
}
static unique_ptr<CompressedSegmentState> InitSegment(ColumnSegment &segment, block_id_t block_id,
optional_ptr<ColumnSegmentState> segment_state) {
return nullptr;
}
};

} // namespace duckdb
20 changes: 20 additions & 0 deletions src/include/duckdb/storage/table/column_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ class ColumnData {
idx_t GetAllocationSize() const {
return allocation_size;
}
bool HasCompressionFunction() const {
return compression != nullptr;
}
const CompressionFunction &GetCompressionFunction() const {
D_ASSERT(HasCompressionFunction());
return *compression;
}

bool IsEmptyValidity() const {
if (type.id() != LogicalTypeId::VALIDITY) {
return false;
}
if (!parent->compression) {
return false;
}
if (parent->compression->validity != CompressionValidity::NO_VALIDITY_REQUIRED) {
return false;
}
return true;
}

virtual void SetStart(idx_t new_start);
//! The root type of the column
Expand Down
2 changes: 2 additions & 0 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "duckdb/storage/storage_manager.hpp"
#include "duckdb/transaction/transaction_manager.hpp"
#include "duckdb/main/capi/extension_api.hpp"
#include "duckdb/storage/compression/empty_validity.hpp"

#ifndef DUCKDB_NO_THREADS
#include "duckdb/common/thread.hpp"
Expand All @@ -34,6 +35,7 @@
namespace duckdb {

DBConfig::DBConfig() {
empty_validity = EmptyValidityCompression::CreateFunction();
compression_functions = make_uniq<CompressionFunctionSet>();
encoding_functions = make_uniq<EncodingFunctionSet>();
encoding_functions->Initialize(*this);
Expand Down
13 changes: 11 additions & 2 deletions src/storage/compression/dictionary/decompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali
block_size = segment.GetBlockManager().GetBlockSize();

dict = DictionaryCompression::GetDictionary(segment, *handle);
dictionary = make_buffer<Vector>(segment.type, index_buffer_count);
dictionary_size = index_buffer_count;

if (!initialize_dictionary) {
// Used by fetch, as fetch will never produce a DictionaryVector
return;
}

dictionary = make_buffer<Vector>(segment.type, index_buffer_count);
dictionary_size = index_buffer_count;
auto dict_child_data = FlatVector::GetData<string_t>(*(dictionary));
auto &validity = FlatVector::Validity(*dictionary);
D_ASSERT(index_buffer_count >= 1);
validity.SetInvalid(0);
for (uint32_t i = 0; i < index_buffer_count; i++) {
// NOTE: the passing of dict_child_vector, will not be used, its for big strings
uint16_t str_len = GetStringLength(i);
Expand All @@ -61,6 +64,7 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali

void CompressedStringScanState::ScanToFlatVector(Vector &result, idx_t result_offset, idx_t start, idx_t scan_count) {
auto result_data = FlatVector::GetData<string_t>(result);
auto &validity = FlatVector::Validity(result);

// Handling non-bitpacking-group-aligned start values;
idx_t start_offset = start % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE;
Expand All @@ -82,6 +86,9 @@ void CompressedStringScanState::ScanToFlatVector(Vector &result, idx_t result_of
for (idx_t i = 0; i < scan_count; i++) {
// Lookup dict offset in index buffer
auto string_number = sel_vec->get_index(i + start_offset);
if (string_number == 0) {
validity.SetInvalid(result_offset + i);
}
auto dict_offset = index_buffer_ptr[string_number];
auto str_len = GetStringLength(UnsafeNumericCast<sel_t>(string_number));
result_data[result_offset + i] = FetchStringFromDict(UnsafeNumericCast<int32_t>(dict_offset), str_len);
Expand Down Expand Up @@ -109,6 +116,8 @@ void CompressedStringScanState::ScanToDictionaryVector(ColumnSegment &segment, V
BitpackingPrimitives::UnPackBuffer<sel_t>(dst, src, scan_count, current_width);

result.Dictionary(*(dictionary), dictionary_size, *sel_vec, scan_count);
// FIXME: this assumes the type is VectorType::DICTIONARY
// this could fail if the result is a ConstantVector instead.
DictionaryVector::SetDictionaryId(result, to_string(CastPointerToValue(&segment)));
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/compression/dictionary_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,16 @@ void DictionaryCompressionStorage::StringFetchRow(ColumnSegment &segment, Column
// Get Function
//===--------------------------------------------------------------------===//
CompressionFunction DictionaryCompressionFun::GetFunction(PhysicalType data_type) {
return CompressionFunction(
auto res = CompressionFunction(
CompressionType::COMPRESSION_DICTIONARY, data_type, DictionaryCompressionStorage ::StringInitAnalyze,
DictionaryCompressionStorage::StringAnalyze, DictionaryCompressionStorage::StringFinalAnalyze,
DictionaryCompressionStorage::InitCompression, DictionaryCompressionStorage::Compress,
DictionaryCompressionStorage::FinalizeCompress, DictionaryCompressionStorage::StringInitScan,
DictionaryCompressionStorage::StringScan, DictionaryCompressionStorage::StringScanPartial<false>,
DictionaryCompressionStorage::StringFetchRow, UncompressedFunctions::EmptySkip,
UncompressedStringStorage::StringInitSegment);
res.validity = CompressionValidity::NO_VALIDITY_REQUIRED;
return res;
}

bool DictionaryCompressionFun::TypeIsSupported(const PhysicalType physical_type) {
Expand Down
11 changes: 8 additions & 3 deletions src/storage/table/column_data_checkpointer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "duckdb/storage/table/column_data_checkpointer.hpp"
#include "duckdb/main/config.hpp"
#include "duckdb/storage/table/update_segment.hpp"
#include "duckdb/storage/compression/empty_validity.hpp"
#include "duckdb/storage/data_table.hpp"
#include "duckdb/parser/column_definition.hpp"
#include "duckdb/storage/table/scan_state.hpp"
Expand All @@ -15,9 +16,13 @@ ColumnDataCheckpointer::ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup
checkpoint_info(checkpoint_info_p) {

auto &config = DBConfig::GetConfig(GetDatabase());
auto functions = config.GetCompressionFunctions(GetType().InternalType());
for (auto &func : functions) {
compression_functions.push_back(&func.get());
if (is_validity && col_data_p.IsEmptyValidity()) {
compression_functions.push_back(config.GetEmptyValidity());
} else {
auto functions = config.GetCompressionFunctions(GetType().InternalType());
for (auto &func : functions) {
compression_functions.push_back(&func.get());
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/table/column_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ unique_ptr<ColumnSegment> ColumnSegment::CreatePersistentSegment(DatabaseInstanc

if (block_id == INVALID_BLOCK) {
function = config.GetCompressionFunction(CompressionType::COMPRESSION_CONSTANT, type.InternalType());
} else if (type.id() == LogicalTypeId::VALIDITY && compression_type == CompressionType::COMPRESSION_AUTO) {
// The validity is not actually stored in this block, this is just a dummy
function = config.GetEmptyValidity();
block = block_manager.RegisterBlock(block_id);
} else {
function = config.GetCompressionFunction(compression_type, type.InternalType());
block = block_manager.RegisterBlock(block_id);
Expand Down

0 comments on commit e4db440

Please sign in to comment.