Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 7, 2025
1 parent 126e6fa commit d554f99
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct DictFSSTAnalyzeState : public DictFSSTCompressionState {
void AddNewString(string_t str) override;
void AddLastLookup() override;
void AddNull() override;
bool CalculateSpaceRequirements(bool new_string, idx_t string_size) override;
bool HasRoomForString(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void Verify() override;

Expand Down
3 changes: 2 additions & 1 deletion src/include/duckdb/storage/compression/dict_fsst/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ typedef struct {
uint32_t index_buffer_offset;
uint32_t index_buffer_count;
uint32_t bitpacking_width;
bool fsst_encoded;
} dict_fsst_compression_header_t;

struct DictFSSTCompression {
Expand Down Expand Up @@ -54,7 +55,7 @@ class DictFSSTCompressionState : public CompressionState {
// Add a null value to the compression state
virtual void AddNull() = 0;
// Needs to be called before adding a value. Will return false if a flush is required first.
virtual bool CalculateSpaceRequirements(bool new_string, idx_t string_size) = 0;
virtual bool HasRoomForString(bool new_string, idx_t string_size) = 0;
// Flush the segment to disk if compressing or reset the counters if analyzing
virtual void Flush(bool final = false) = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState {
void AddNewString(string_t str) override;
void AddNull() override;
void AddLastLookup() override;
bool CalculateSpaceRequirements(bool new_string, idx_t string_size) override;
bool HasRoomForString(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
idx_t Finalize();

Expand Down
32 changes: 32 additions & 0 deletions src/storage/compression/dict_fsst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,38 @@
#include "duckdb/function/compression/compression.hpp"
#include "duckdb/function/compression_function.hpp"

/*
Data layout per segment:
+-----------------------------------------------------+
| Header |
| +---------------------------------------------+ |
| | dict_fsst_compression_header_t header | |
| +---------------------------------------------+ |
| |
+-----------------------------------------------------+
| Selection Buffer |
| +------------------------------------+ |
| | uint16_t index_buffer_idx[] | |
| +------------------------------------+ |
| tuple index -> index buffer idx |
| |
+--------------------------------------------+
| Index Buffer |
| +------------------------------------+ |
| | uint16_t dictionary_offset[] | |
| +------------------------------------+ |
| string_index -> offset in the dictionary |
| |
+--------------------------------------------+
| Dictionary |
| +------------------------------------+ |
| | uint8_t *raw_string_data | |
| +------------------------------------+ |
| the string data without lengths |
| |
+--------------------------------------------+
*/

namespace duckdb {
namespace dict_fsst {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/compression/dict_fsst/analyze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void DictFSSTAnalyzeState::AddNull() {
current_tuple_count++;
}

bool DictFSSTAnalyzeState::CalculateSpaceRequirements(bool new_string, idx_t string_size) {
bool DictFSSTAnalyzeState::HasRoomForString(bool new_string, idx_t string_size) {
if (!new_string) {
return DictFSSTCompression::HasEnoughSpace(current_tuple_count + 1, current_unique_count, current_dict_size,
current_width, info.GetBlockSize());
Expand Down
5 changes: 3 additions & 2 deletions src/storage/compression/dict_fsst/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) {
new_string = !LookupString(data[idx]);
}

bool fits = CalculateSpaceRequirements(new_string, string_size);
bool fits = HasRoomForString(new_string, string_size);
if (!fits) {
// TODO: Check if the dictionary requires FSST encoding
Flush();
new_string = true;

fits = CalculateSpaceRequirements(new_string, string_size);
fits = HasRoomForString(new_string, string_size);
if (!fits) {
throw InternalException("Dictionary compression could not write to new segment");
}
Expand Down
16 changes: 13 additions & 3 deletions src/storage/compression/dict_fsst/compression.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb/storage/compression/dict_fsst/compression.hpp"
#include "duckdb/storage/segment/uncompressed.hpp"
#include "fsst.h"

namespace duckdb {
namespace dict_fsst {
Expand Down Expand Up @@ -61,6 +62,11 @@ void DictFSSTCompressionCompressState::AddNewString(string_t str) {
UncompressedStringStorage::UpdateStringStats(current_segment->stats, str);

// Copy string to dict
// New entries are added to the start (growing backwards)
// [............xxxxooooooooo]
// x: new string
// o: existing string
// .: (currently) unused space
current_dictionary.size += str.GetSize();
auto dict_pos = current_end_ptr - current_dictionary.size;
memcpy(dict_pos, str.GetData(), str.GetSize());
Expand Down Expand Up @@ -91,14 +97,18 @@ void DictFSSTCompressionCompressState::AddLastLookup() {
current_segment->count++;
}

bool DictFSSTCompressionCompressState::CalculateSpaceRequirements(bool new_string, idx_t string_size) {
bool DictFSSTCompressionCompressState::HasRoomForString(bool new_string, idx_t string_size) {
static constexpr uint16_t FSST_SYMBOL_TABLE_SIZE = sizeof(duckdb_fsst_decoder_t);

auto block_size = info.GetBlockSize();

if (!new_string) {
return DictFSSTCompression::HasEnoughSpace(current_segment->count.load() + 1, index_buffer.size(),
current_dictionary.size, current_width, info.GetBlockSize());
current_dictionary.size, current_width, block_size);
}
next_width = BitpackingPrimitives::MinimumBitWidth(index_buffer.size() - 1 + new_string);
return DictFSSTCompression::HasEnoughSpace(current_segment->count.load() + 1, index_buffer.size() + 1,
current_dictionary.size + string_size, next_width, info.GetBlockSize());
current_dictionary.size + string_size, next_width, block_size);
}

void DictFSSTCompressionCompressState::Flush(bool final) {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/compression/dictionary/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ void DictionaryCompressionCompressState::AddNewString(string_t str) {
UncompressedStringStorage::UpdateStringStats(current_segment->stats, str);

// Copy string to dict
// New entries are added to the start (growing backwards)
// [............xxxxooooooooo]
// x: new string
// o: existing string
// .: (currently) unused space
current_dictionary.size += str.GetSize();
auto dict_pos = current_end_ptr - current_dictionary.size;
memcpy(dict_pos, str.GetData(), str.GetSize());
Expand Down
11 changes: 4 additions & 7 deletions src/storage/compression/fsst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ idx_t FSSTStorage::StringFinalAnalyze(AnalyzeState &state_p) {
BitpackingPrimitives::GetRequiredSize(string_count + state.empty_strings, minimum_width);

auto estimated_base_size = double(bitpacked_offsets_size + compressed_dict_size) * (1 / ANALYSIS_SAMPLE_SIZE);
auto num_blocks = estimated_base_size / double(state.info.GetBlockSize() - sizeof(duckdb_fsst_decoder_t));
auto num_blocks = estimated_base_size / (double(state.info.GetBlockSize() - sizeof(duckdb_fsst_decoder_t)));
auto symtable_size = num_blocks * sizeof(duckdb_fsst_decoder_t);
auto estimated_size = estimated_base_size + symtable_size;

Expand Down Expand Up @@ -352,12 +352,9 @@ class FSSTCompressionState : public CompressionState {
reinterpret_cast<uint32_t *>(index_buffer.data()),
current_segment->count, current_width);

// Write the fsst symbol table or nothing
if (fsst_encoder != nullptr) {
memcpy(base_ptr + symbol_table_offset, &fsst_serialized_symbol_table[0], fsst_serialized_symbol_table_size);
} else {
memset(base_ptr + symbol_table_offset, 0, fsst_serialized_symbol_table_size);
}
// Write the fsst symbol table
D_ASSERT(fsst_encoder);
memcpy(base_ptr + symbol_table_offset, &fsst_serialized_symbol_table[0], fsst_serialized_symbol_table_size);

Store<uint32_t>(NumericCast<uint32_t>(symbol_table_offset),
data_ptr_cast(&header_ptr->fsst_symbol_table_offset));
Expand Down

0 comments on commit d554f99

Please sign in to comment.