Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 16, 2025
1 parent 5a05754 commit f7344eb
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 299 deletions.
45 changes: 7 additions & 38 deletions src/include/duckdb/storage/compression/dict_fsst/analyze.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,19 @@ namespace dict_fsst {
//===--------------------------------------------------------------------===//
// Analyze
//===--------------------------------------------------------------------===//
struct DictFSSTAnalyzeState : public DictFSSTCompressionState {
struct DictFSSTAnalyzeState : public AnalyzeState {
public:
explicit DictFSSTAnalyzeState(const CompressionInfo &info);
~DictFSSTAnalyzeState() override;

public:
optional_idx LookupString(const string_t &str) override;
void AddNewString(const StringData &str) override;
void AddLookup(uint32_t lookup_result) override;
void AddNull() override;
idx_t RequiredSpace(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override;
bool EncodeDictionary() override;
StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
void Verify() override;
bool Analyze(Vector &input, idx_t count);
idx_t FinalAnalyze();

public:
idx_t segment_count;
idx_t current_tuple_count;
idx_t current_unique_count;
idx_t current_dict_size;
StringHeap heap;
//! string -> string_length
string_map_t<uint32_t> current_string_map;
bitpacking_width_t current_width;
bitpacking_width_t next_width;

bitpacking_width_t string_length_bitwidth = NumericLimits<uint8_t>::Maximum();
uint32_t max_length = 0;

idx_t total_space = 0;
unsafe_unique_array<unsigned char> compression_buffer;
idx_t compression_buffer_size = 0;
};

struct DictFSSTCompressionAnalyzeState : public AnalyzeState {
public:
explicit DictFSSTCompressionAnalyzeState(const CompressionInfo &info)
: AnalyzeState(info), analyze_state(make_uniq<DictFSSTAnalyzeState>(info)) {
}

public:
unique_ptr<DictFSSTAnalyzeState> analyze_state;
idx_t max_string_length = 0;
bool contains_nulls = false;
idx_t total_string_length = 0;
idx_t total_count = 0;
};

} // namespace dict_fsst
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct DictFSSTCompression {
public:
//! Dictionary header size at the beginning of the string segment (offset + length)
static constexpr uint16_t DICTIONARY_HEADER_SIZE = sizeof(dict_fsst_compression_header_t);
static constexpr idx_t STRING_SIZE_LIMIT = 16384;

public:
static bool HasEnoughSpace(idx_t current_count, idx_t index_count, idx_t dict_size,
Expand Down
80 changes: 42 additions & 38 deletions src/include/duckdb/storage/compression/dict_fsst/compression.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "duckdb/common/typedefs.hpp"
#include "duckdb/storage/compression/dict_fsst/common.hpp"
#include "duckdb/storage/compression/dict_fsst/analyze.hpp"
#include "duckdb/function/compression_function.hpp"
#include "duckdb/common/string_map_set.hpp"
#include "duckdb/storage/table/column_data_checkpointer.hpp"
Expand All @@ -19,66 +20,69 @@ namespace dict_fsst {
// scanning the whole dictionary at once and then scanning the selection buffer for each emitted vector. Secondly, it
// allows for efficient bitpacking compression as the selection values should remain relatively small.

struct EncodedInputData {
public:
explicit EncodedInputData(Allocator &allocator) : heap(allocator) {
}

public:
void Reset() {
input_data.clear();
heap.Destroy();
}

public:
StringHeap heap;
vector<string_t> input_data;
};

//===--------------------------------------------------------------------===//
// Compress
//===--------------------------------------------------------------------===//
struct DictFSSTCompressionCompressState : public DictFSSTCompressionState {
struct DictFSSTCompressionCompressState : public CompressionState {
public:
DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info);
DictFSSTCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p,
unique_ptr<DictFSSTAnalyzeState> &&state);
~DictFSSTCompressionCompressState() override;

public:
void CreateEmptySegment(idx_t row_start);
void Verify() override;
optional_idx LookupString(const string_t &str) override;
void AddNewString(const StringData &str) override;
void AddNull() override;
void AddLookup(uint32_t lookup_result) override;
idx_t RequiredSpace(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override;
bool EncodeDictionary() override;
StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
idx_t Finalize();

void Compress(Vector &scan_vector, idx_t count);
void FinalizeCompress();
void Flush();

public:
ColumnDataCheckpointData &checkpoint_data;
CompressionFunction &function;

// State regarding current segment
unique_ptr<ColumnSegment> current_segment;
BufferHandle current_handle;
StringDictionaryContainer current_dictionary;
data_ptr_t current_end_ptr;
//! Offset at which to write the next dictionary string
idx_t dictionary_offset = 0;

public:
idx_t string_lengths_space;
vector<uint32_t> string_lengths;
idx_t dict_count = 0;
bitpacking_width_t string_lengths_width = 0;
//! For DICT_FSST we delay encoding of new entries, which means
// we have to prepare for strings being exploded in size by max 2x
// so we have to muddy out 'string_lengths_width'
bitpacking_width_t real_string_lengths_width = 0;
uint32_t max_string_length = 0;

// Buffers and map for current segment
idx_t dictionary_indices_space;
vector<uint32_t> dictionary_indices;
bitpacking_width_t dictionary_indices_width = 0;
//! uint32_t max_dictionary_index; (this is 'dict_count')

//! string -> dictionary_index (for lookups)
string_map_t<uint32_t> current_string_map;
vector<uint32_t> dictionary_string_lengths;
uint32_t max_length = 0;
bitpacking_width_t string_length_bitwidth = 0;
//! strings added to the dictionary waiting to be encoded
vector<string_t> dictionary_encoding_buffer;
//! for DICT_FSST we store uncompressed strings in the 'current_string_map', this owns that memory
StringHeap uncompressed_dictionary_copy;

vector<uint32_t> selection_buffer;
//! This is used for FSST_ONLY, to store the memory of the encoded input
unsafe_unique_array<unsigned char> encoding_buffer;
idx_t encoding_buffer_size = 0;

bitpacking_width_t current_width = 0;
bitpacking_width_t next_width = 0;
public:
void *encoder = nullptr;
idx_t symbol_table_size = DConstants::INVALID_INDEX;
DictionaryAppendState append_state = DictionaryAppendState::REGULAR;
bool all_unique = true;

EncodedInputData encoded_input;
public:
idx_t tuple_count = 0;
unique_ptr<DictFSSTAnalyzeState> analyze;
};

} // namespace dict_fsst
Expand Down
16 changes: 7 additions & 9 deletions src/storage/compression/dict_fsst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,26 @@ struct DictFSSTCompressionStorage {
//===--------------------------------------------------------------------===//
unique_ptr<AnalyzeState> DictFSSTCompressionStorage::StringInitAnalyze(ColumnData &col_data, PhysicalType type) {
CompressionInfo info(col_data.GetBlockManager().GetBlockSize());
return make_uniq<DictFSSTCompressionAnalyzeState>(info);
return make_uniq<DictFSSTAnalyzeState>(info);
}

bool DictFSSTCompressionStorage::StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count) {
auto &state = state_p.Cast<DictFSSTCompressionAnalyzeState>();
return state.analyze_state->UpdateState(input, count);
auto &analyze_state = state_p.Cast<DictFSSTAnalyzeState>();
return analyze_state.Analyze(input, count);
}

idx_t DictFSSTCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) {
auto &analyze_state = state_p.Cast<DictFSSTCompressionAnalyzeState>();
auto &state = *analyze_state.analyze_state;
state.Flush();

return state.total_space;
auto &analyze_state = state_p.Cast<DictFSSTAnalyzeState>();
return analyze_state.FinalAnalyze();
}

//===--------------------------------------------------------------------===//
// Compress
//===--------------------------------------------------------------------===//
unique_ptr<CompressionState> DictFSSTCompressionStorage::InitCompression(ColumnDataCheckpointData &checkpoint_data,
unique_ptr<AnalyzeState> state) {
return make_uniq<DictFSSTCompressionCompressState>(checkpoint_data, state->info);
return make_uniq<DictFSSTCompressionCompressState>(
checkpoint_data, unique_ptr_cast<AnalyzeState, DictFSSTAnalyzeState>(std::move(state)));
}

void DictFSSTCompressionStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) {
Expand Down
Loading

0 comments on commit f7344eb

Please sign in to comment.