Skip to content

Commit

Permalink
conditional append flush
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jan 28, 2025
1 parent a71e60f commit 5d37854
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
22 changes: 16 additions & 6 deletions src/execution/operator/persistent/physical_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,11 @@ static idx_t PerformOnConflictAction(InsertLocalState &lstate, InsertGlobalState
return update_chunk.size();
}
auto &local_storage = LocalStorage::Get(context.client, data_table.db);
D_ASSERT(gstate.initialized);
data_table.FinalizeLocalAppend(gstate.append_state);
gstate.initialized = false;
if (gstate.initialized) {
// Flush the data first, it might be referenced by the Update
data_table.FinalizeLocalAppend(gstate.append_state);
gstate.initialized = false;
}
local_storage.Update(data_table, row_ids, set_columns, update_chunk);
return update_chunk.size();
}
Expand Down Expand Up @@ -661,31 +663,39 @@ SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk,
gstate.initialized = true;
}

// FIXME: this is way too optimistic
// some tuples could be filtered out entirely and thus shouldn't be added to the return chunk.
if (action_type != OnConflictAction::NOTHING && return_chunk) {
// If the action is UPDATE or REPLACE, we will always create either an APPEND or an INSERT
// for NOTHING we don't create either an APPEND or an INSERT for the tuple
// so it should not be added to the RETURNING chunk
gstate.return_collection.Append(lstate.insert_chunk);
}

idx_t updated_tuples = OnConflictHandling(table, context, lstate, gstate);

if (action_type == OnConflictAction::NOTHING && return_chunk) {
// Because we didn't add to the RETURNING chunk yet
// we add the tuples that did not get filtered out now
gstate.return_collection.Append(lstate.insert_chunk);
}

gstate.insert_count += lstate.insert_chunk.size();
gstate.insert_count += updated_tuples;
storage.LocalAppend(gstate.append_state, context.client, lstate.insert_chunk, true);
if (action_type == OnConflictAction::UPDATE && lstate.update_chunk.size() != 0) {
// Flush the append so we can target the data we just appended with the update
storage.FinalizeLocalAppend(gstate.append_state);
gstate.initialized = false;
if (gstate.initialized) {
// Flush the append so we can target the data we just appended with the update
storage.FinalizeLocalAppend(gstate.append_state);
gstate.initialized = false;
}
(void)HandleInsertConflicts<true>(table, context, lstate, gstate, lstate.update_chunk, *this);
(void)HandleInsertConflicts<false>(table, context, lstate, gstate, lstate.update_chunk, *this);
// All of the tuples should have been turned into an update, leaving the chunk empty afterwards
D_ASSERT(lstate.update_chunk.size() == 0);
}
} else {
//! FIXME: can't we enable this by using a BatchedDataCollection ?
D_ASSERT(!return_chunk);
// parallel append
if (!lstate.local_collection) {
Expand Down
7 changes: 6 additions & 1 deletion test/sql/upsert/test_big_insert.test
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ SELECT COUNT(*) FILTER (WHERE j = 10) FROM integers

# All insert tuples cause a conflict on the same row
statement error
INSERT INTO integers(i,j) select i%5,i from range(5000) tbl(i) on conflict do update set j = excluded.j, k = excluded.i;
INSERT INTO integers(i,j) select
i%5,
i
from range(5000) tbl(i) on conflict do update set
j = excluded.j,
k = excluded.i;
----
Invalid Input Error: ON CONFLICT DO UPDATE can not update the same row twice in the same command. Ensure that no rows proposed for insertion within the same command have duplicate constrained values

Expand Down

0 comments on commit 5d37854

Please sign in to comment.