Skip to content

Commit

Permalink
introduce StringData to hold the uncompressed string, together with a…
Browse files Browse the repository at this point in the history
…n optional encoded string
  • Loading branch information
Tishj committed Jan 8, 2025
1 parent 825a620 commit 7e704c1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 35 deletions.
6 changes: 3 additions & 3 deletions src/include/duckdb/storage/compression/dict_fsst/analyze.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ struct DictFSSTAnalyzeState : public DictFSSTCompressionState {
explicit DictFSSTAnalyzeState(const CompressionInfo &info);

public:
optional_idx LookupString(string_t str) override;
void AddNewString(string_t str) override;
optional_idx LookupString(const string_t &str) override;
void AddNewString(const StringData &str) override;
void AddLookup(uint32_t lookup_result) override;
void AddNull() override;
idx_t RequiredSpace(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override;
bool EncodeDictionary() override;
const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
void Verify() override;

public:
Expand Down
26 changes: 23 additions & 3 deletions src/include/duckdb/storage/compression/dict_fsst/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,25 @@ struct DictFSSTCompression {
static void SetDictionary(ColumnSegment &segment, BufferHandle &handle, StringDictionaryContainer container);
};

struct StringData {
public:
explicit StringData(const string_t &string, optional_ptr<const string_t> encoded_string = nullptr)
: string(string), encoded_string(encoded_string) {
}

public:
const string_t &Get() const {
if (encoded_string) {
return *encoded_string;
}
return string;
}

public:
const string_t &string;
optional_ptr<const string_t> encoded_string;
};

//! Abstract class managing the compression state for size analysis or compression.
class DictFSSTCompressionState : public CompressionState {
public:
Expand All @@ -53,22 +72,23 @@ class DictFSSTCompressionState : public CompressionState {
// Should verify the State
virtual void Verify() = 0;
// Performs a lookup of str, storing the result internally
virtual optional_idx LookupString(string_t str) = 0;
virtual optional_idx LookupString(const string_t &str) = 0;
// Add the most recently looked up str to compression state
virtual void AddLookup(uint32_t lookup_result) = 0;
// Add string to the state that is known to not be seen yet
virtual void AddNewString(string_t str) = 0;
virtual void AddNewString(const StringData &str) = 0;
// Add a null value to the compression state
virtual void AddNull() = 0;
virtual idx_t RequiredSpace(bool new_string, idx_t string_size) = 0;
// Flush the segment to disk if compressing or reset the counters if analyzing
virtual void Flush(bool final = false) = 0;
virtual void UpdateStats(UnifiedVectorFormat &input, idx_t count) {/* no-op */};
// Process the strings of the vector if necessary
virtual void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) = 0;
// Encode the dictionary with FSST, return false if we decided not to encode
virtual bool EncodeDictionary() = 0;
// Retrieve the string given the indices
virtual const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) = 0;
virtual StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) = 0;

private:
bool DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count, idx_t index, idx_t raw_index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ struct DictFSSTCompressionCompressState : public DictFSSTCompressionState {
public:
void CreateEmptySegment(idx_t row_start);
void Verify() override;
optional_idx LookupString(string_t str) override;
void AddNewString(string_t str) override;
optional_idx LookupString(const string_t &str) override;
void AddNewString(const StringData &str) override;
void AddNull() override;
void AddLookup(uint32_t lookup_result) override;
idx_t RequiredSpace(bool new_string, idx_t string_size) override;
void Flush(bool final = false) override;
void EncodeInputStrings(UnifiedVectorFormat &input, idx_t count) override;
bool EncodeDictionary() override;
const string_t &GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
StringData GetString(const string_t *strings, idx_t index, idx_t raw_index) override;
idx_t Finalize();

public:
Expand Down
11 changes: 7 additions & 4 deletions src/storage/compression/dict_fsst/analyze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ DictFSSTAnalyzeState::DictFSSTAnalyzeState(const CompressionInfo &info)
current_dict_size(0), current_width(0), next_width(0) {
}

optional_idx DictFSSTAnalyzeState::LookupString(string_t str) {
optional_idx DictFSSTAnalyzeState::LookupString(const string_t &str) {
return current_set.count(str) ? 1 : optional_idx();
}

void DictFSSTAnalyzeState::AddNewString(string_t str) {
void DictFSSTAnalyzeState::AddNewString(const StringData &string_data) {
D_ASSERT(!string_data.encoded_string);
auto &str = string_data.string;

current_tuple_count++;
current_unique_count++;
current_dict_size += str.GetSize();
Expand Down Expand Up @@ -40,8 +43,8 @@ bool DictFSSTAnalyzeState::EncodeDictionary() {
return false;
}

const string_t &DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) {
return strings[index];
StringData DictFSSTAnalyzeState::GetString(const string_t *strings, idx_t index, idx_t raw_index) {
return StringData(strings[index]);
}

idx_t DictFSSTAnalyzeState::RequiredSpace(bool new_string, idx_t string_size) {
Expand Down
17 changes: 10 additions & 7 deletions src/storage/compression/dict_fsst/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ DictFSSTCompressionState::~DictFSSTCompressionState() {
bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count,
idx_t index, idx_t raw_index) {
auto strings = vdata.GetData<string_t>(vdata);
reference<const string_t> str = GetString(strings, index, raw_index);
auto required_space = RequiredSpace(is_new, str.get().GetSize());
auto string_data = GetString(strings, index, raw_index);
auto &str = string_data.Get();
auto required_space = RequiredSpace(is_new, str.GetSize());

auto block_size = info.GetBlockSize();

Expand All @@ -60,8 +61,8 @@ bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVec
//! Decide whether or not to encode the dictionary
if (EncodeDictionary()) {
EncodeInputStrings(vdata, count);
str = GetString(strings, index, raw_index);
required_space = RequiredSpace(is_new, str.get().GetSize());
auto encoded_string = GetString(strings, index, raw_index);
required_space = RequiredSpace(is_new, encoded_string.Get().GetSize());
if (required_space > block_size) {
//! Even after encoding the dictionary, we can't add this string
return false;
Expand Down Expand Up @@ -97,9 +98,10 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) {
optional_idx lookup_result;
auto row_is_valid = vdata.validity.RowIsValid(idx);

reference<const string_t> str = GetString(data, idx, i);
auto string_data = GetString(data, idx, i);
if (row_is_valid) {
string_size = str.get().GetSize();
auto &str = string_data.Get();
string_size = str.GetSize();
if (string_size >= StringUncompressed::GetStringBlockLimit(info.GetBlockSize())) {
// Big strings not implemented for dictionary compression
return false;
Expand All @@ -124,7 +126,8 @@ bool DictFSSTCompressionState::UpdateState(Vector &scan_vector, idx_t count) {
} else if (lookup_result.IsValid()) {
AddLookup(UnsafeNumericCast<uint32_t>(lookup_result.GetIndex()));
} else {
AddNewString(str);
auto string = GetString(data, idx, i);
AddNewString(string);
}

Verify();
Expand Down
51 changes: 36 additions & 15 deletions src/storage/compression/dict_fsst/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ void DictFSSTCompressionCompressState::Verify() {
D_ASSERT(current_segment->count == selection_buffer.size());
D_ASSERT(DictFSSTCompression::HasEnoughSpace(current_segment->count.load(), index_buffer.size(),
current_dictionary.size, current_width, info.GetBlockSize()));
D_ASSERT(current_dictionary.end == info.GetBlockSize());

#ifdef DEBUG
if (append_state != DictionaryAppendState::ENCODED) {
D_ASSERT(current_dictionary.end == info.GetBlockSize());
}
#endif
D_ASSERT(index_buffer.size() == current_string_map.size() + 1); // +1 is for null value
}

optional_idx DictFSSTCompressionCompressState::LookupString(string_t str) {
optional_idx DictFSSTCompressionCompressState::LookupString(const string_t &str) {
auto search = current_string_map.find(str);
auto has_result = search != current_string_map.end();

Expand All @@ -65,9 +70,11 @@ optional_idx DictFSSTCompressionCompressState::LookupString(string_t str) {
return search->second;
}

void DictFSSTCompressionCompressState::AddNewString(string_t str) {
UncompressedStringStorage::UpdateStringStats(current_segment->stats, str);
void DictFSSTCompressionCompressState::AddNewString(const StringData &string_data) {
//! Update the stats using the uncompressed string always!
UncompressedStringStorage::UpdateStringStats(current_segment->stats, string_data.string);

auto &str = append_state == DictionaryAppendState::ENCODED ? *string_data.encoded_string : string_data.string;
// Copy string to dict
// New entries are added to the start (growing backwards)
// [............xxxxooooooooo]
Expand All @@ -78,7 +85,11 @@ void DictFSSTCompressionCompressState::AddNewString(string_t str) {
auto dict_pos = current_end_ptr - current_dictionary.size;
memcpy(dict_pos, str.GetData(), str.GetSize());
current_dictionary.Verify(info.GetBlockSize());
D_ASSERT(current_dictionary.end == info.GetBlockSize());
#ifdef DEBUG
if (append_state != DictionaryAppendState::ENCODED) {
D_ASSERT(current_dictionary.end == info.GetBlockSize());
}
#endif

// Update buffers and map
index_buffer.push_back(current_dictionary.size);
Expand Down Expand Up @@ -170,7 +181,7 @@ void DictFSSTCompressionCompressState::EncodeInputStrings(UnifiedVectorFormat &i
for (idx_t i = 0; i < count; i++) {
uint32_t size = UnsafeNumericCast<uint32_t>(compressed_sizes[i]);
string_t encoded_string((const char *)compressed_ptrs[i], size); // NOLINT;
strings.push_back(heap.AddString(std::move(encoded_string)));
strings.push_back(heap.AddBlob(std::move(encoded_string)));
}
}

Expand All @@ -186,7 +197,10 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() {
data_ptr_t last_start = current_end_ptr;
//! We could use the 'current_string_map' but this won't be in-order
// and we want to preserve the order of the dictionary after rewriting
for (auto offset : index_buffer) {

// Skip index 0, that's reserved for NULL
for (idx_t i = 1; i < index_buffer.size(); i++) {
auto offset = index_buffer[i];
auto start = current_end_ptr - offset;
size_t size = UnsafeNumericCast<size_t>(last_start - start);
fsst_string_sizes.push_back(size);
Expand All @@ -195,7 +209,7 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() {
}

// Create the encoder
auto string_count = index_buffer.size();
auto string_count = index_buffer.size() - 1;
encoder = duckdb_fsst_create(string_count, &fsst_string_sizes[0], &fsst_string_ptrs[0], 0);
auto fsst_encoder = (duckdb_fsst_encoder_t *)(encoder);

Expand Down Expand Up @@ -225,10 +239,11 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() {
auto &start = compressed_ptrs[i];
auto &size = compressed_sizes[i];
offset += size;
index_buffer[i] = offset;
// Skip index 0, reserved for NULL
index_buffer[i + 1] = offset;
auto dest = current_end_ptr - offset;
memcpy(dest, start, size);
string_t dictionary_string((const char *)start, UnsafeNumericCast<uint32_t>(size)); // NOLINT
string_t dictionary_string((const char *)dest, UnsafeNumericCast<uint32_t>(size)); // NOLINT
current_string_map.insert({dictionary_string, i});
}
current_dictionary.size = offset;
Expand All @@ -237,17 +252,23 @@ bool DictFSSTCompressionCompressState::EncodeDictionary() {
return true;
}

const string_t &DictFSSTCompressionCompressState::GetString(const string_t *strings, idx_t index, idx_t raw_index) {
if (append_state != DictionaryAppendState::ENCODED) {
return strings[index];
StringData DictFSSTCompressionCompressState::GetString(const string_t *strings, idx_t index, idx_t raw_index) {
StringData result(strings[index]);
if (append_state == DictionaryAppendState::ENCODED) {
result.encoded_string = encoded_input.input_data[raw_index];
}
return encoded_input.input_data[raw_index];
return result;
}

idx_t DictFSSTCompressionCompressState::Finalize() {
auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase());
auto handle = buffer_manager.Pin(current_segment->block);
D_ASSERT(current_dictionary.end == info.GetBlockSize());

#ifdef DEBUG
if (append_state != DictionaryAppendState::ENCODED) {
D_ASSERT(current_dictionary.end == info.GetBlockSize());
}
#endif
const bool is_fsst_encoded = append_state == DictionaryAppendState::ENCODED;

// calculate sizes
Expand Down

0 comments on commit 7e704c1

Please sign in to comment.