Skip to content

Commit

Permalink
fix up the scan portion after the compression rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 17, 2025
1 parent 22771bf commit 52d5424
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 82 deletions.
7 changes: 3 additions & 4 deletions src/include/duckdb/storage/compression/dict_fsst/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ enum class DictFSSTMode : uint8_t {

typedef struct {
uint32_t dict_size;
uint32_t dict_end;
uint32_t string_lengths_offset;
uint32_t string_lengths_width;
uint32_t dict_count;
uint32_t dictionary_indices_width;
DictFSSTMode mode;
uint8_t string_lengths_width;
uint8_t dictionary_indices_width;
uint32_t symbol_table_size;
} dict_fsst_compression_header_t;

enum class DictionaryAppendState : uint8_t {
Expand Down
33 changes: 18 additions & 15 deletions src/include/duckdb/storage/compression/dict_fsst/decompression.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ namespace dict_fsst {
//===--------------------------------------------------------------------===//
// Scan
//===--------------------------------------------------------------------===//
// FIXME: why is this StringScanState when we also define: `BufferHandle handle` ???
struct CompressedStringScanState : public StringScanState {
struct CompressedStringScanState : public SegmentScanState {
public:
explicit CompressedStringScanState(BufferHandle &&handle_p)
: StringScanState(), owned_handle(std::move(handle_p)), handle(owned_handle) {
CompressedStringScanState(ColumnSegment &segment, BufferHandle &&handle_p)
: segment(segment), owned_handle(std::move(handle_p)), handle(owned_handle) {
}
explicit CompressedStringScanState(BufferHandle &handle_p) : StringScanState(), owned_handle(), handle(handle_p) {
CompressedStringScanState(ColumnSegment &segment, BufferHandle &handle_p)
: segment(segment), owned_handle(), handle(handle_p) {
}

~CompressedStringScanState() override;

public:
void Initialize(ColumnSegment &segment, bool initialize_dictionary = true);
void Initialize(bool initialize_dictionary = true);
void ScanToFlatVector(Vector &result, idx_t result_offset, idx_t start, idx_t scan_count);
void ScanToDictionaryVector(ColumnSegment &segment, Vector &result, idx_t result_offset, idx_t start,
idx_t scan_count);
Expand All @@ -31,27 +32,29 @@ struct CompressedStringScanState : public StringScanState {
uint32_t GetStringLength(sel_t index);

public:
ColumnSegment &segment;
BufferHandle owned_handle;
optional_ptr<BufferHandle> handle;

bitpacking_width_t current_width;
DictFSSTMode mode;
idx_t dictionary_size;
uint32_t dict_count;
bitpacking_width_t dictionary_indices_width;
bitpacking_width_t string_lengths_width;

buffer_ptr<SelectionVector> sel_vec;
vector<uint32_t> string_lengths;
idx_t sel_vec_size = 0;
DictFSSTMode mode;

vector<uint32_t> string_lengths;

//! Start of the block (pointing to the dictionary_header)
data_ptr_t baseptr;
//! Start of the data (pointing to the start of the selection buffer)
data_ptr_t base_data;
data_ptr_t dict_ptr;
data_ptr_t dictionary_indices_ptr;
data_ptr_t string_lengths_ptr;
bitpacking_width_t string_lengths_width;
uint32_t dict_count;

buffer_ptr<Vector> dictionary;
idx_t dictionary_size;
StringDictionaryContainer dict;
idx_t block_size;

void *decoder = nullptr;
vector<unsigned char> decompress_buffer;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/compression/dict_fsst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ void DictFSSTCompressionStorage::FinalizeCompress(CompressionState &state_p) {
//===--------------------------------------------------------------------===//
unique_ptr<SegmentScanState> DictFSSTCompressionStorage::StringInitScan(ColumnSegment &segment) {
auto &buffer_manager = BufferManager::GetBufferManager(segment.db);
auto state = make_uniq<CompressedStringScanState>(buffer_manager.Pin(segment.block));
state->Initialize(segment, true);
auto state = make_uniq<CompressedStringScanState>(segment, buffer_manager.Pin(segment.block));
state->Initialize(true);
return std::move(state);
}

Expand Down Expand Up @@ -141,8 +141,8 @@ void DictFSSTCompressionStorage::StringScan(ColumnSegment &segment, ColumnScanSt
void DictFSSTCompressionStorage::StringFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id,
Vector &result, idx_t result_idx) {
// fetch a single row from the string segment
CompressedStringScanState scan_state(state.GetOrInsertHandle(segment));
scan_state.Initialize(segment, false);
CompressedStringScanState scan_state(segment, state.GetOrInsertHandle(segment));
scan_state.Initialize(false);
scan_state.ScanToFlatVector(result, result_idx, NumericCast<idx_t>(row_id), 1);
}

Expand Down
3 changes: 1 addition & 2 deletions src/storage/compression/dict_fsst/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ StringDictionaryContainer DictFSSTCompression::GetDictionary(ColumnSegment &segm
auto header_ptr = reinterpret_cast<dict_fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset());
StringDictionaryContainer container;
container.size = Load<uint32_t>(data_ptr_cast(&header_ptr->dict_size));
container.end = Load<uint32_t>(data_ptr_cast(&header_ptr->dict_end));
container.end = container.size;
return container;
}

void DictFSSTCompression::SetDictionary(ColumnSegment &segment, BufferHandle &handle,
StringDictionaryContainer container) {
auto header_ptr = reinterpret_cast<dict_fsst_compression_header_t *>(handle.Ptr() + segment.GetBlockOffset());
Store<uint32_t>(container.size, data_ptr_cast(&header_ptr->dict_size));
Store<uint32_t>(container.end, data_ptr_cast(&header_ptr->dict_end));
}

// bool DictFSSTCompressionState::DryAppendToCurrentSegment(bool is_new, UnifiedVectorFormat &vdata, idx_t count,
Expand Down
72 changes: 41 additions & 31 deletions src/storage/compression/dict_fsst/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,41 @@ idx_t DictFSSTCompressionState::Finalize() {
D_ASSERT(string_lengths_space == this->string_lengths_space);
}
#endif
auto total_size = DictFSSTCompression::DICTIONARY_HEADER_SIZE + dictionary_indices_space + string_lengths_space +
dictionary_offset;
if (is_fsst_encoded) {
total_size += symbol_table_size;

if (!is_fsst_encoded) {
symbol_table_size = 0;
}
const auto total_size = DictFSSTCompression::DICTIONARY_HEADER_SIZE + dictionary_indices_space +
string_lengths_space + dictionary_offset + symbol_table_size;

// calculate ptr and offsets
auto base_ptr = current_handle.Ptr();
auto header_ptr = reinterpret_cast<dict_fsst_compression_header_t *>(base_ptr);
auto offset_to_dictionary = AlignValue<idx_t>(DictFSSTCompression::DICTIONARY_HEADER_SIZE);
auto offset_to_dictionary_indices = offset_to_dictionary + dictionary_offset;
auto dictionary_dest = AlignValue<idx_t>(DictFSSTCompression::DICTIONARY_HEADER_SIZE);
auto symbol_table_dest = AlignValue<idx_t>(dictionary_dest + dictionary_offset);
auto string_lengths_dest = AlignValue<idx_t>(symbol_table_dest + symbol_table_size);
auto dictionary_indices_dest = AlignValue<idx_t>(string_lengths_dest + string_lengths_space);

header_ptr->mode = ConvertToMode(append_state);
header_ptr->symbol_table_size = symbol_table_size;
header_ptr->dict_size = dictionary_offset;
header_ptr->dict_count = dict_count;
header_ptr->dictionary_indices_width = dictionary_indices_width;
header_ptr->string_lengths_width = string_lengths_width;

// Write the symbol table
if (is_fsst_encoded) {
D_ASSERT(symbol_table_size != DConstants::INVALID_INDEX);
memcpy(base_ptr + offset_to_dictionary_indices, fsst_serialized_symbol_table.get(), symbol_table_size);
offset_to_dictionary_indices += symbol_table_size;
memcpy(base_ptr + symbol_table_dest, fsst_serialized_symbol_table.get(), symbol_table_size);
}
offset_to_dictionary_indices = AlignValue<idx_t>(offset_to_dictionary_indices);
auto string_lengths_offset = offset_to_dictionary_indices + dictionary_indices_space;
header_ptr->mode = ConvertToMode(append_state);

// Write compressed selection buffer
BitpackingPrimitives::PackBuffer<sel_t, false>(base_ptr + offset_to_dictionary_indices,
// Write the string lengths of the dictionary
BitpackingPrimitives::PackBuffer<uint32_t, false>(base_ptr + string_lengths_dest, string_lengths.data(), dict_count,
string_lengths_width);
// Write the dictionary indices (selection vector)
BitpackingPrimitives::PackBuffer<sel_t, false>(base_ptr + dictionary_indices_dest,
(sel_t *)(dictionary_indices.data()), tuple_count,
dictionary_indices_width);
BitpackingPrimitives::PackBuffer<uint32_t, false>(base_ptr + string_lengths_offset, string_lengths.data(),
dict_count, string_lengths_width);

// Store sizes and offsets in segment header
Store<uint32_t>(NumericCast<uint32_t>(string_lengths_offset), data_ptr_cast(&header_ptr->string_lengths_offset));
Store<uint32_t>(NumericCast<uint32_t>(dict_count), data_ptr_cast(&header_ptr->dict_count));
Store<uint32_t>((uint32_t)dictionary_indices_width, data_ptr_cast(&header_ptr->dictionary_indices_width));
Store<uint32_t>((uint32_t)string_lengths_width, data_ptr_cast(&header_ptr->string_lengths_width));

if (append_state != DictionaryAppendState::ENCODED_ALL_UNIQUE) {
D_ASSERT(dictionary_indices_width == BitpackingPrimitives::MinimumBitWidth(dict_count - 1));
Expand All @@ -107,9 +110,6 @@ idx_t DictFSSTCompressionState::Finalize() {
D_ASSERT(info.GetBlockSize() >= required_space);
#endif
D_ASSERT((uint64_t)*max_element(std::begin(dictionary_indices), std::end(dictionary_indices)) == dict_count - 1);

// Write the new dictionary with the updated "end".
DictFSSTCompression::SetDictionary(*current_segment, current_handle, current_dictionary);
return total_size;
}

Expand Down Expand Up @@ -189,6 +189,10 @@ void DictFSSTCompressionState::Flush(bool final) {
FlushEncodingBuffer();
}

if (!tuple_count) {
return;
}

current_segment->count = tuple_count;
current_dictionary.size = dictionary_offset;
current_dictionary.end = dictionary_offset;
Expand Down Expand Up @@ -391,6 +395,11 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
idx_t lookup = DConstants::INVALID_INDEX;
auto &str = strings[idx];

//! In GetRequiredSize we will round up to ALGORITHM_GROUP_SIZE anyways
// so we can avoid recalculating for every tuple
const bool recalculate_indices_space = ((tuple_count % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 0);
// const bool recalculate_indices_space = true;

switch (append_state) {
case DictionaryAppendState::NOT_ENCODED:
case DictionaryAppendState::REGULAR: {
Expand All @@ -401,8 +410,6 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
lookup = 0;
}

const bool recalculate_indices_space =
((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1;
if (append_state == DictionaryAppendState::REGULAR) {
if (lookup != DConstants::INVALID_INDEX) {
return AddLookup<DictionaryAppendState::REGULAR>(*this, lookup, recalculate_indices_space);
Expand Down Expand Up @@ -433,8 +440,6 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
lookup = 0;
}

const bool recalculate_indices_space =
((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1;
bool fits;
if (lookup != DConstants::INVALID_INDEX) {
fits = AddLookup<DictionaryAppendState::ENCODED>(*this, lookup, recalculate_indices_space);
Expand Down Expand Up @@ -486,6 +491,13 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
}

// FIXME: can we compress directly to the segment? that would save a copy
// I think yes?
// We can give the segment as destination, and limit the size
// it will tell us when it can't fit everything
// worst case we can just check if the rest of the metadata fits when we remove the last string that it was
// able to encode I believe 'duckdb_fsst_compress' tells us how many of the input strings it was able to
// compress We can work backwards from there to see how many strings actually fit (probably worst case ret-1
// ??)
auto fsst_encoder = reinterpret_cast<duckdb_fsst_encoder_t *>(encoder);
auto res = duckdb_fsst_compress(fsst_encoder, input_string_lengths.size(), input_string_lengths.data(),
input_string_ptrs.data(), output_buffer_size, encoding_buffer.get(),
Expand All @@ -501,8 +513,6 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
}
}

const bool recalculate_indices_space =
((tuple_count + 1) % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) == 1;
auto &string = encoded_input.data[i - encoded_input.offset];
return AddToDictionary<DictionaryAppendState::ENCODED_ALL_UNIQUE>(*this, string, recalculate_indices_space);
}
Expand Down Expand Up @@ -638,7 +648,7 @@ void DictFSSTCompressionState::Compress(Vector &scan_vector, idx_t count) {
}

void DictFSSTCompressionState::FinalizeCompress() {
throw NotImplementedException("TODO");
Flush(true);
}

} // namespace dict_fsst
Expand Down
60 changes: 34 additions & 26 deletions src/storage/compression/dict_fsst/decompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ uint32_t CompressedStringScanState::GetStringLength(sel_t index) {
}

string_t CompressedStringScanState::FetchStringFromDict(Vector &result, int32_t dict_offset, uint32_t string_len) {
D_ASSERT(dict_offset >= 0 && dict_offset <= NumericCast<int32_t>(block_size));
D_ASSERT(dict_offset >= 0 && dict_offset <= NumericCast<int32_t>(segment.GetBlockManager().GetBlockSize()));
if (dict_offset == 0) {
return string_t(nullptr, 0);
}

// normal string: read string from this block
auto dict_end = baseptr + dict.end;
auto dict_pos = dict_end - dict_offset;
auto dict_pos = dict_ptr + dict_offset;

auto str_ptr = char_ptr_cast(dict_pos);
switch (mode) {
Expand All @@ -34,42 +33,49 @@ string_t CompressedStringScanState::FetchStringFromDict(Vector &result, int32_t
}
}

void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initialize_dictionary) {
void CompressedStringScanState::Initialize(bool initialize_dictionary) {
baseptr = handle->Ptr() + segment.GetBlockOffset();

// Load header values
auto header_ptr = reinterpret_cast<dict_fsst_compression_header_t *>(baseptr);
auto string_lengths_offset = Load<uint32_t>(data_ptr_cast(&header_ptr->string_lengths_offset));
dict_count = Load<uint32_t>(data_ptr_cast(&header_ptr->dict_count));
current_width = (bitpacking_width_t)(Load<uint32_t>(data_ptr_cast(&header_ptr->dictionary_indices_width)));
string_lengths_width = (bitpacking_width_t)(Load<uint32_t>(data_ptr_cast(&header_ptr->string_lengths_width)));
string_lengths.resize(AlignValue<uint32_t, BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE>(dict_count));
auto string_lengths_size = BitpackingPrimitives::GetRequiredSize(dict_count, string_lengths_width);

if (segment.GetBlockOffset() + string_lengths_offset + string_lengths_size + header_ptr->dict_size >
segment.GetBlockManager().GetBlockSize()) {
throw IOException(
"Failed to scan dictionary string - index was out of range. Database file appears to be corrupted.");
}
string_lengths_ptr = baseptr + string_lengths_offset;
base_data = data_ptr_cast(baseptr + sizeof(dict_fsst_compression_header_t));

block_size = segment.GetBlockManager().GetBlockSize();

dict = DictFSSTCompression::GetDictionary(segment, *handle);
mode = header_ptr->mode;
if (mode >= DictFSSTMode::COUNT) {
throw FatalException("This block was written with a mode that is not recognized by this version, highest "
"available mode %d, found mode: %d",
static_cast<uint8_t>(DictFSSTMode::COUNT), static_cast<uint8_t>(mode));
}

dict_count = header_ptr->dict_count;
auto symbol_table_size = header_ptr->symbol_table_size;
dict = DictFSSTCompression::GetDictionary(segment, *handle);

dictionary_indices_width =
(bitpacking_width_t)(Load<uint8_t>(data_ptr_cast(&header_ptr->dictionary_indices_width)));
string_lengths_width = (bitpacking_width_t)(Load<uint8_t>(data_ptr_cast(&header_ptr->string_lengths_width)));

auto string_lengths_space = BitpackingPrimitives::GetRequiredSize(dict_count, string_lengths_width);
auto dictionary_indices_space =
BitpackingPrimitives::GetRequiredSize(segment.count.load(), dictionary_indices_width);

auto dictionary_dest = AlignValue<idx_t>(DictFSSTCompression::DICTIONARY_HEADER_SIZE);
auto symbol_table_dest = AlignValue<idx_t>(dictionary_dest + dict.size);
auto string_lengths_dest = AlignValue<idx_t>(symbol_table_dest + symbol_table_size);
auto dictionary_indices_dest = AlignValue<idx_t>(string_lengths_dest + string_lengths_space);

const auto total_space = segment.GetBlockOffset() + dictionary_indices_dest + dictionary_indices_space;
if (total_space > segment.GetBlockManager().GetBlockSize()) {
throw IOException(
"Failed to scan dictionary string - index was out of range. Database file appears to be corrupted.");
}
dict_ptr = data_ptr_cast(baseptr + dictionary_dest);
dictionary_indices_ptr = data_ptr_cast(baseptr + dictionary_indices_dest);
string_lengths_ptr = data_ptr_cast(baseptr + string_lengths_dest);

switch (mode) {
case DictFSSTMode::FSST_ONLY:
case DictFSSTMode::DICT_FSST: {
decoder = new duckdb_fsst_decoder_t;
auto symbol_table_location = baseptr + dict.end;
auto ret = duckdb_fsst_import(reinterpret_cast<duckdb_fsst_decoder_t *>(decoder), symbol_table_location);
auto ret = duckdb_fsst_import(reinterpret_cast<duckdb_fsst_decoder_t *>(decoder), baseptr + symbol_table_dest);
(void)(ret);
D_ASSERT(ret != 0); // FIXME: the old code set the decoder to nullptr instead, why???

Expand All @@ -80,6 +86,8 @@ void CompressedStringScanState::Initialize(ColumnSegment &segment, bool initiali
default:
break;
}

string_lengths.resize(AlignValue<uint32_t, BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE>(dict_count));
BitpackingPrimitives::UnPackBuffer<uint32_t>(data_ptr_cast(string_lengths.data()),
data_ptr_cast(string_lengths_ptr), dict_count, string_lengths_width);

Expand Down Expand Up @@ -120,10 +128,10 @@ const SelectionVector &CompressedStringScanState::GetSelVec(idx_t start, idx_t s
sel_vec = make_buffer<SelectionVector>(decompress_count);
}

data_ptr_t sel_buf_src = &base_data[((start - start_offset) * current_width) / 8];
data_ptr_t sel_buf_src = &dictionary_indices_ptr[((start - start_offset) * dictionary_indices_width) / 8];
sel_t *sel_vec_ptr = sel_vec->data();
BitpackingPrimitives::UnPackBuffer<sel_t>(data_ptr_cast(sel_vec_ptr), sel_buf_src, decompress_count,
current_width);
dictionary_indices_width);
return *sel_vec;
}
}
Expand Down

0 comments on commit 52d5424

Please sign in to comment.