diff --git a/src/include/duckdb/storage/table/column_data_checkpointer.hpp b/src/include/duckdb/storage/table/column_data_checkpointer.hpp index 5f8bb80cbb00..a5bd436bb822 100644 --- a/src/include/duckdb/storage/table/column_data_checkpointer.hpp +++ b/src/include/duckdb/storage/table/column_data_checkpointer.hpp @@ -18,10 +18,13 @@ struct TableScanOptions; //! Holds state related to a single column during compression struct ColumnDataCheckpointData { public: + //! Default constructor used when column data does not need to be checkpointed + ColumnDataCheckpointData() { + } ColumnDataCheckpointData(ColumnCheckpointState &checkpoint_state, ColumnData &col_data, DatabaseInstance &db, - RowGroup &row_group, bool has_changes, ColumnCheckpointInfo &checkpoint_info) + RowGroup &row_group, ColumnCheckpointInfo &checkpoint_info) : checkpoint_state(checkpoint_state), col_data(col_data), db(db), row_group(row_group), - has_changes(has_changes), checkpoint_info(checkpoint_info) { + checkpoint_info(checkpoint_info) { } public: @@ -32,13 +35,12 @@ struct ColumnDataCheckpointData { ColumnCheckpointState &GetCheckpointState(); DatabaseInstance &GetDatabase(); -public: - ColumnCheckpointState &checkpoint_state; - ColumnData &col_data; - DatabaseInstance &db; - RowGroup &row_group; - bool has_changes; - ColumnCheckpointInfo &checkpoint_info; +private: + optional_ptr checkpoint_state; + optional_ptr col_data; + optional_ptr db; + optional_ptr row_group; + optional_ptr checkpoint_info; }; struct CheckpointAnalyzeResult { diff --git a/src/storage/table/column_data.cpp b/src/storage/table/column_data.cpp index 80f95654d9ea..53e7c545976a 100644 --- a/src/storage/table/column_data.cpp +++ b/src/storage/table/column_data.cpp @@ -630,17 +630,6 @@ unique_ptr ColumnData::Checkpoint(RowGroup &row_group, Co ColumnDataCheckpointer checkpointer(states, GetDatabase(), row_group, checkpoint_info); checkpointer.Checkpoint(); checkpointer.FinalizeCheckpoint(); - - // reset the compression function - compression.reset(); - // replace the old tree with the new one - auto new_segments = checkpoint_state->new_tree.MoveSegments(); - auto l = data.Lock(); - for (auto &new_segment : new_segments) { - AppendSegment(l, std::move(new_segment.node)); - } - ClearUpdates(); - return checkpoint_state; } diff --git a/src/storage/table/column_data_checkpointer.cpp b/src/storage/table/column_data_checkpointer.cpp index 79edb57fb71a..13658077d2bf 100644 --- a/src/storage/table/column_data_checkpointer.cpp +++ b/src/storage/table/column_data_checkpointer.cpp @@ -10,30 +10,30 @@ namespace duckdb { //! ColumnDataCheckpointData CompressionFunction &ColumnDataCheckpointData::GetCompressionFunction(CompressionType compression_type) { - auto &db = col_data.GetDatabase(); - auto &column_type = col_data.type; + auto &db = col_data->GetDatabase(); + auto &column_type = col_data->type; auto &config = DBConfig::GetConfig(db); return *config.GetCompressionFunction(compression_type, column_type.InternalType()); } DatabaseInstance &ColumnDataCheckpointData::GetDatabase() { - return col_data.GetDatabase(); + return col_data->GetDatabase(); } const LogicalType &ColumnDataCheckpointData::GetType() const { - return col_data.type; + return col_data->type; } ColumnData &ColumnDataCheckpointData::GetColumnData() { - return col_data; + return *col_data; } RowGroup &ColumnDataCheckpointData::GetRowGroup() { - return row_group; + return *row_group; } ColumnCheckpointState &ColumnDataCheckpointData::GetCheckpointState() { - return checkpoint_state; + return *checkpoint_state; } //! ColumnDataCheckpointer @@ -281,8 +281,8 @@ void ColumnDataCheckpointer::WriteToDisk() { // Initialize the compression for the selected function D_ASSERT(analyze_result.size() == checkpoint_states.size()); - vector checkpoint_data; - vector> compression_states; + vector checkpoint_data(checkpoint_states.size()); + vector> compression_states(checkpoint_states.size()); for (idx_t i = 0; i < analyze_result.size(); i++) { if (!has_changes[i]) { continue; @@ -293,9 +293,9 @@ void ColumnDataCheckpointer::WriteToDisk() { auto &checkpoint_state = checkpoint_states[i]; auto &col_data = checkpoint_state.get().column_data; - checkpoint_data.emplace_back(checkpoint_state, col_data, col_data.GetDatabase(), row_group, has_changes[i], - checkpoint_info); - compression_states.push_back(function->init_compression(checkpoint_data[i], std::move(analyze_state))); + checkpoint_data[i] = + ColumnDataCheckpointData(checkpoint_state, col_data, col_data.GetDatabase(), row_group, checkpoint_info); + compression_states[i] = function->init_compression(checkpoint_data[i], std::move(analyze_state)); } // Scan over the existing segment + changes and compress the data @@ -395,6 +395,16 @@ void ColumnDataCheckpointer::FinalizeCheckpoint() { } else { WritePersistentSegments(state); } + + // reset the compression function + col_data.compression.reset(); + // replace the old tree with the new one + auto new_segments = state.new_tree.MoveSegments(); + auto l = col_data.data.Lock(); + for (auto &new_segment : new_segments) { + col_data.AppendSegment(l, std::move(new_segment.node)); + } + col_data.ClearUpdates(); } } diff --git a/src/storage/table/standard_column_data.cpp b/src/storage/table/standard_column_data.cpp index 6d1b5bd10310..6b6311c3d7d3 100644 --- a/src/storage/table/standard_column_data.cpp +++ b/src/storage/table/standard_column_data.cpp @@ -232,160 +232,29 @@ unique_ptr StandardColumnData::Checkpoint(RowGroup &row_g // to prevent reading the validity data immediately after it is checkpointed we first checkpoint the main column // this is necessary for concurrent checkpointing as due to the partial block manager checkpointed data might be // flushed to disk by a different thread than the one that wrote it, causing a data race - auto base_state = ColumnData::Checkpoint(row_group, checkpoint_info); - auto validity_state = validity.Checkpoint(row_group, checkpoint_info); + auto base_state = CreateCheckpointState(row_group, checkpoint_info.info.manager); + base_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique(); + auto validity_state_p = validity.CreateCheckpointState(row_group, checkpoint_info.info.manager); + validity_state_p->global_stats = BaseStatistics::CreateEmpty(validity.type).ToUnique(); - /* TODO: abstract this - - create base checkpoint state - create validity checkpoint state - - vector> checkpoint_states; - vector> column_segments; - - ColumnDataCheckpointer checkpointer(checkpoint_states); - - D_ASSERT(checkpoint_states.size() == column_segments.size()); - has_changes.reserve(checkpoint_states.size()); - for (idx_t i = 0; i < column_segments.size(); i++) { - auto &nodes = column_segments[i]; - has_changes.push_back(HasChanges(nodes)); - } - - bool any_has_changes = false; - for (auto &changes : has_changes) { - if (changes) { - any_has_changes = true; - } - } - if (!any_has_changes) { - return; - } - - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - if (!changes) { - continue; - } - auto &nodes = column_segments[i]; - CommitDropSegments(nodes); - } - - // vector>> compression_functions; (created as part of the - ColumnDataCheckpointer constructor); - vector> analyze_states; - // TODO: - // figure out the forced compression function - - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - if (!changes) { - continue; - } - - auto &functions = compression_functions[i]; - auto &states = analyze_states[i]; - auto &checkpoint_state = checkpoint_states[i]; - auto &coldata = checkpoint_state.coldata; - for (auto &func : functions) { - states.push_back(func.init_analyze(coldata, coldata.GetType()) - } - } - - D_ASSERT(!checkpoint_states.empty()); - auto &first_state = checkpoint_states[0]; - auto &coldata = first_state.coldata; - auto &first_nodes = column_segments[0]; - ScanSegments(coldata, first_nodes, [](Vector &input, input_count) { - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - if (!changes) { - continue; - } - - auto &functions = compression_functions[i]; - auto &states = analyze_states[i]; - for (idx_t j = 0; j < functions.size(); j++) { - auto &function = functions[j]; - auto &state = states[j]; - if (!state) { - continue; - } - if (!function.analyze(state, input, input_count)) { - function = nullptr; - state = nullptr; - } - } - } - }); - - vector> compression_function; - vector> compression_state; - - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - if (!changes) { - continue; - } - - TODO: - final analyze and prefer the forced compression method - followed by 'init_compression' to create the 'compression_state' for this coldata - } + auto &validity_state = *validity_state_p; + auto &checkpoint_state = base_state->Cast(); + checkpoint_state.validity_state = std::move(validity_state_p); - ScanSegments(coldata, first_nodes, [](Vector &input, idx_t input_count) { - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - if (!changes) { - continue; - } - - auto &state = compression_state[i]; - auto &function = compression_function[i]; - function.compress(state, input, input_count); - } - }); - - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - if (!changes) { - continue; - } - - auto &function = compression_function[i]; - function.compress_finalize(state); + auto &nodes = data.ReferenceSegments(); + if (nodes.empty()) { + // empty table: flush the empty list + return base_state; } + vector> checkpoint_states; + checkpoint_states.emplace_back(checkpoint_state); + checkpoint_states.emplace_back(validity_state); - for (idx_t i = 0; i < checkpoint_states.size(); i++) { - auto &changes = has_changes[i]; - auto &coldata = checkpoint_states[i]; - - auto existing_nodes = coldata. - - if (!changes) { - WritePersistentSegments(coldata); - } else { - auto &coldata = checkpoint_states[i].coldata; - new_segments = checkpointer.MoveSegments(); - coldata.AppendSegments(new_segments); - coldata.ClearUpdates(); - } - - // reset the compression function - coldata.compression.reset(); - // replace the old tree with the new one - auto new_segments = checkpoint_state->new_tree.MoveSegments(); - for (auto &new_segment : new_segments) { - coldata.AppendSegment(l, std::move(new_segment.node)); - } - coldata.ClearUpdates(); - } - - END OF TODO */ + ColumnDataCheckpointer checkpointer(checkpoint_states, GetDatabase(), row_group, checkpoint_info); + checkpointer.Checkpoint(); + checkpointer.FinalizeCheckpoint(); - auto &checkpoint_state = base_state->Cast(); - checkpoint_state.validity_state = std::move(validity_state); return base_state; }