Skip to content

Commit

Permalink
checkpoint base+validity together
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 6, 2025
1 parent 0c6db17 commit 989b30e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 180 deletions.
20 changes: 11 additions & 9 deletions src/include/duckdb/storage/table/column_data_checkpointer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ struct TableScanOptions;
//! Holds state related to a single column during compression
struct ColumnDataCheckpointData {
public:
//! Default constructor used when column data does not need to be checkpointed
ColumnDataCheckpointData() {
}
ColumnDataCheckpointData(ColumnCheckpointState &checkpoint_state, ColumnData &col_data, DatabaseInstance &db,
RowGroup &row_group, bool has_changes, ColumnCheckpointInfo &checkpoint_info)
RowGroup &row_group, ColumnCheckpointInfo &checkpoint_info)
: checkpoint_state(checkpoint_state), col_data(col_data), db(db), row_group(row_group),
has_changes(has_changes), checkpoint_info(checkpoint_info) {
checkpoint_info(checkpoint_info) {
}

public:
Expand All @@ -32,13 +35,12 @@ struct ColumnDataCheckpointData {
ColumnCheckpointState &GetCheckpointState();
DatabaseInstance &GetDatabase();

public:
ColumnCheckpointState &checkpoint_state;
ColumnData &col_data;
DatabaseInstance &db;
RowGroup &row_group;
bool has_changes;
ColumnCheckpointInfo &checkpoint_info;
private:
optional_ptr<ColumnCheckpointState> checkpoint_state;
optional_ptr<ColumnData> col_data;
optional_ptr<DatabaseInstance> db;
optional_ptr<RowGroup> row_group;
optional_ptr<ColumnCheckpointInfo> checkpoint_info;
};

struct CheckpointAnalyzeResult {
Expand Down
11 changes: 0 additions & 11 deletions src/storage/table/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,17 +630,6 @@ unique_ptr<ColumnCheckpointState> ColumnData::Checkpoint(RowGroup &row_group, Co
ColumnDataCheckpointer checkpointer(states, GetDatabase(), row_group, checkpoint_info);
checkpointer.Checkpoint();
checkpointer.FinalizeCheckpoint();

// reset the compression function
compression.reset();
// replace the old tree with the new one
auto new_segments = checkpoint_state->new_tree.MoveSegments();
auto l = data.Lock();
for (auto &new_segment : new_segments) {
AppendSegment(l, std::move(new_segment.node));
}
ClearUpdates();

return checkpoint_state;
}

Expand Down
34 changes: 22 additions & 12 deletions src/storage/table/column_data_checkpointer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,30 @@ namespace duckdb {
//! ColumnDataCheckpointData

CompressionFunction &ColumnDataCheckpointData::GetCompressionFunction(CompressionType compression_type) {
auto &db = col_data.GetDatabase();
auto &column_type = col_data.type;
auto &db = col_data->GetDatabase();
auto &column_type = col_data->type;
auto &config = DBConfig::GetConfig(db);
return *config.GetCompressionFunction(compression_type, column_type.InternalType());
}

DatabaseInstance &ColumnDataCheckpointData::GetDatabase() {
return col_data.GetDatabase();
return col_data->GetDatabase();
}

const LogicalType &ColumnDataCheckpointData::GetType() const {
return col_data.type;
return col_data->type;
}

ColumnData &ColumnDataCheckpointData::GetColumnData() {
return col_data;
return *col_data;
}

RowGroup &ColumnDataCheckpointData::GetRowGroup() {
return row_group;
return *row_group;
}

ColumnCheckpointState &ColumnDataCheckpointData::GetCheckpointState() {
return checkpoint_state;
return *checkpoint_state;
}

//! ColumnDataCheckpointer
Expand Down Expand Up @@ -281,8 +281,8 @@ void ColumnDataCheckpointer::WriteToDisk() {

// Initialize the compression for the selected function
D_ASSERT(analyze_result.size() == checkpoint_states.size());
vector<ColumnDataCheckpointData> checkpoint_data;
vector<unique_ptr<CompressionState>> compression_states;
vector<ColumnDataCheckpointData> checkpoint_data(checkpoint_states.size());
vector<unique_ptr<CompressionState>> compression_states(checkpoint_states.size());
for (idx_t i = 0; i < analyze_result.size(); i++) {
if (!has_changes[i]) {
continue;
Expand All @@ -293,9 +293,9 @@ void ColumnDataCheckpointer::WriteToDisk() {
auto &checkpoint_state = checkpoint_states[i];
auto &col_data = checkpoint_state.get().column_data;

checkpoint_data.emplace_back(checkpoint_state, col_data, col_data.GetDatabase(), row_group, has_changes[i],
checkpoint_info);
compression_states.push_back(function->init_compression(checkpoint_data[i], std::move(analyze_state)));
checkpoint_data[i] =
ColumnDataCheckpointData(checkpoint_state, col_data, col_data.GetDatabase(), row_group, checkpoint_info);
compression_states[i] = function->init_compression(checkpoint_data[i], std::move(analyze_state));
}

// Scan over the existing segment + changes and compress the data
Expand Down Expand Up @@ -395,6 +395,16 @@ void ColumnDataCheckpointer::FinalizeCheckpoint() {
} else {
WritePersistentSegments(state);
}

// reset the compression function
col_data.compression.reset();
// replace the old tree with the new one
auto new_segments = state.new_tree.MoveSegments();
auto l = col_data.data.Lock();
for (auto &new_segment : new_segments) {
col_data.AppendSegment(l, std::move(new_segment.node));
}
col_data.ClearUpdates();
}
}

Expand Down
165 changes: 17 additions & 148 deletions src/storage/table/standard_column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,160 +232,29 @@ unique_ptr<ColumnCheckpointState> StandardColumnData::Checkpoint(RowGroup &row_g
// to prevent reading the validity data immediately after it is checkpointed we first checkpoint the main column
// this is necessary for concurrent checkpointing as due to the partial block manager checkpointed data might be
// flushed to disk by a different thread than the one that wrote it, causing a data race
auto base_state = ColumnData::Checkpoint(row_group, checkpoint_info);
auto validity_state = validity.Checkpoint(row_group, checkpoint_info);
auto base_state = CreateCheckpointState(row_group, checkpoint_info.info.manager);
base_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique();
auto validity_state_p = validity.CreateCheckpointState(row_group, checkpoint_info.info.manager);
validity_state_p->global_stats = BaseStatistics::CreateEmpty(validity.type).ToUnique();

/* TODO: abstract this
create base checkpoint state
create validity checkpoint state
vector<reference<ColumnCheckpointState>> checkpoint_states;
vector<reference<column_segment_vector_t>> column_segments;
ColumnDataCheckpointer checkpointer(checkpoint_states);
D_ASSERT(checkpoint_states.size() == column_segments.size());
has_changes.reserve(checkpoint_states.size());
for (idx_t i = 0; i < column_segments.size(); i++) {
auto &nodes = column_segments[i];
has_changes.push_back(HasChanges(nodes));
}
bool any_has_changes = false;
for (auto &changes : has_changes) {
if (changes) {
any_has_changes = true;
}
}
if (!any_has_changes) {
return;
}
for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
if (!changes) {
continue;
}
auto &nodes = column_segments[i];
CommitDropSegments(nodes);
}
// vector<vector<reference<const CompressionFunction>>> compression_functions; (created as part of the
ColumnDataCheckpointer constructor);
vector<vector<unique_ptr<AnalyzeState>> analyze_states;
// TODO:
// figure out the forced compression function
for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
if (!changes) {
continue;
}
auto &functions = compression_functions[i];
auto &states = analyze_states[i];
auto &checkpoint_state = checkpoint_states[i];
auto &coldata = checkpoint_state.coldata;
for (auto &func : functions) {
states.push_back(func.init_analyze(coldata, coldata.GetType())
}
}
D_ASSERT(!checkpoint_states.empty());
auto &first_state = checkpoint_states[0];
auto &coldata = first_state.coldata;
auto &first_nodes = column_segments[0];
ScanSegments(coldata, first_nodes, [](Vector &input, input_count) {
for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
if (!changes) {
continue;
}
auto &functions = compression_functions[i];
auto &states = analyze_states[i];
for (idx_t j = 0; j < functions.size(); j++) {
auto &function = functions[j];
auto &state = states[j];
if (!state) {
continue;
}
if (!function.analyze(state, input, input_count)) {
function = nullptr;
state = nullptr;
}
}
}
});
vector<reference<const CompressionFunction>> compression_function;
vector<unique_ptr<CompressionState>> compression_state;
for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
if (!changes) {
continue;
}
TODO:
final analyze and prefer the forced compression method
followed by 'init_compression' to create the 'compression_state' for this coldata
}
auto &validity_state = *validity_state_p;
auto &checkpoint_state = base_state->Cast<StandardColumnCheckpointState>();
checkpoint_state.validity_state = std::move(validity_state_p);

ScanSegments(coldata, first_nodes, [](Vector &input, idx_t input_count) {
for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
if (!changes) {
continue;
}
auto &state = compression_state[i];
auto &function = compression_function[i];
function.compress(state, input, input_count);
}
});
for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
if (!changes) {
continue;
}
auto &function = compression_function[i];
function.compress_finalize(state);
auto &nodes = data.ReferenceSegments();
if (nodes.empty()) {
// empty table: flush the empty list
return base_state;
}

vector<reference<ColumnCheckpointState>> checkpoint_states;
checkpoint_states.emplace_back(checkpoint_state);
checkpoint_states.emplace_back(validity_state);

for (idx_t i = 0; i < checkpoint_states.size(); i++) {
auto &changes = has_changes[i];
auto &coldata = checkpoint_states[i];
auto existing_nodes = coldata.
if (!changes) {
WritePersistentSegments(coldata);
} else {
auto &coldata = checkpoint_states[i].coldata;
new_segments = checkpointer.MoveSegments();
coldata.AppendSegments(new_segments);
coldata.ClearUpdates();
}
// reset the compression function
coldata.compression.reset();
// replace the old tree with the new one
auto new_segments = checkpoint_state->new_tree.MoveSegments();
for (auto &new_segment : new_segments) {
coldata.AppendSegment(l, std::move(new_segment.node));
}
coldata.ClearUpdates();
}
END OF TODO */
ColumnDataCheckpointer checkpointer(checkpoint_states, GetDatabase(), row_group, checkpoint_info);
checkpointer.Checkpoint();
checkpointer.FinalizeCheckpoint();

auto &checkpoint_state = base_state->Cast<StandardColumnCheckpointState>();
checkpoint_state.validity_state = std::move(validity_state);
return base_state;
}

Expand Down

0 comments on commit 989b30e

Please sign in to comment.