Skip to content

Commit

Permalink
Ensure checkpoint tasks complete on IO exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Nov 26, 2024
1 parent aa2fe67 commit 3fb467d
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions src/storage/table/row_group_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -982,29 +982,38 @@ void RowGroupCollection::Checkpoint(TableDataWriter &writer, TableStatistics &gl

VacuumState vacuum_state;
InitializeVacuumState(checkpoint_state, vacuum_state, segments);
// schedule tasks
idx_t total_vacuum_tasks = 0;
auto &config = DBConfig::GetConfig(writer.GetDatabase());
for (idx_t segment_idx = 0; segment_idx < segments.size(); segment_idx++) {
auto &entry = segments[segment_idx];
auto vacuum_tasks = ScheduleVacuumTasks(checkpoint_state, vacuum_state, segment_idx,
total_vacuum_tasks < config.options.max_vacuum_tasks);
if (vacuum_tasks) {
// vacuum tasks were scheduled - don't schedule a checkpoint task yet
total_vacuum_tasks++;
continue;
}
if (!entry.node) {
// row group was vacuumed/dropped - skip
continue;

try {
// schedule tasks
idx_t total_vacuum_tasks = 0;
auto &config = DBConfig::GetConfig(writer.GetDatabase());

for (idx_t segment_idx = 0; segment_idx < segments.size(); segment_idx++) {
auto &entry = segments[segment_idx];
auto vacuum_tasks = ScheduleVacuumTasks(checkpoint_state, vacuum_state, segment_idx,
total_vacuum_tasks < config.options.max_vacuum_tasks);
if (vacuum_tasks) {
// vacuum tasks were scheduled - don't schedule a checkpoint task yet
total_vacuum_tasks++;
continue;
}
if (!entry.node) {
// row group was vacuumed/dropped - skip
continue;
}
// schedule a checkpoint task for this row group
entry.node->MoveToCollection(*this, vacuum_state.row_start);
auto checkpoint_task = GetCheckpointTask(checkpoint_state, segment_idx);
checkpoint_state.executor.ScheduleTask(std::move(checkpoint_task));
vacuum_state.row_start += entry.node->count;
}
// schedule a checkpoint task for this row group
entry.node->MoveToCollection(*this, vacuum_state.row_start);
auto checkpoint_task = GetCheckpointTask(checkpoint_state, segment_idx);
checkpoint_state.executor.ScheduleTask(std::move(checkpoint_task));
vacuum_state.row_start += entry.node->count;
} catch (const std::exception &e) {
ErrorData error(e);
checkpoint_state.executor.PushError(std::move(error));
checkpoint_state.executor.WorkOnTasks(); // ensure all tasks have completed first before rethrowing
throw;
}
// all tasks have been scheduled - execute tasks until we are done
// all tasks have been successfully scheduled - execute tasks until we are done
checkpoint_state.executor.WorkOnTasks();

// no errors - finalize the row groups
Expand Down

0 comments on commit 3fb467d

Please sign in to comment.