Skip to content

Commit

Permalink
Merge branch 'catalog_set_cleanup' into dependency_set
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Oct 28, 2023
2 parents 1d5af1d + 21aced2 commit 7e0e91c
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 69 deletions.
28 changes: 28 additions & 0 deletions src/catalog/catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@ string CatalogEntry::ToSQL() const {
throw InternalException("Unsupported catalog type for ToSQL()");
}

void CatalogEntry::SetChild(unique_ptr<CatalogEntry> child_p) {
child = std::move(child_p);
if (child) {
child->parent = this;
}
}

unique_ptr<CatalogEntry> CatalogEntry::TakeChild() {
if (child) {
child->parent = nullptr;
}
return std::move(child);
}

bool CatalogEntry::HasChild() const {
return child != nullptr;
}
bool CatalogEntry::HasParent() const {
return parent != nullptr;
}

CatalogEntry &CatalogEntry::Child() {
return *child;
}
optional_ptr<CatalogEntry> CatalogEntry::Parent() {
return parent;
}

Catalog &CatalogEntry::ParentCatalog() {
throw InternalException("CatalogEntry::ParentCatalog called on catalog entry without catalog");
}
Expand Down
74 changes: 37 additions & 37 deletions src/catalog/catalog_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class EntryDropper {
public:
//! Both constructor and destructor are privates because they should only be called by DropEntryDependencies
explicit EntryDropper(EntryIndex &entry_index_p) : entry_index(entry_index_p) {
old_deleted = entry_index.GetEntry()->deleted;
old_deleted = entry_index.GetEntry().deleted;
}

~EntryDropper() {
entry_index.GetEntry()->deleted = old_deleted;
entry_index.GetEntry<true>().deleted = old_deleted;
}

private:
Expand Down Expand Up @@ -61,9 +61,8 @@ void CatalogSet::PutEntry(EntryIndex index, unique_ptr<CatalogEntry> catalog_ent
if (entry == entries.end()) {
throw InternalException("Entry with entry index \"%llu\" does not exist", index.GetIndex());
}
catalog_entry->child = std::move(entry->second.entry);
catalog_entry->child->parent = catalog_entry.get();
entry->second.entry = std::move(catalog_entry);
catalog_entry->SetChild(entry->second.TakeEntry());
entry->second.SetEntry(std::move(catalog_entry));
}

bool IsDependencyEntry(CatalogEntry &entry) {
Expand Down Expand Up @@ -130,7 +129,7 @@ bool CatalogSet::CreateEntry(CatalogTransaction transaction, const string &name,
PutMapping(transaction, name, std::move(entry_index));
} else {
index = mapping_value->index.GetIndex();
auto &current = *mapping_value->index.GetEntry();
auto &current = mapping_value->index.GetEntry();
// if it does, we have to check version numbers
if (HasConflict(transaction, current.timestamp)) {
// current version has been written to by a currently active
Expand All @@ -150,7 +149,7 @@ bool CatalogSet::CreateEntry(CatalogTransaction transaction, const string &name,
// push the old entry in the undo buffer for this transaction
if (transaction.transaction) {
auto &dtransaction = transaction.transaction->Cast<DuckTransaction>();
dtransaction.PushCatalogEntry(*value_ptr->child);
dtransaction.PushCatalogEntry(value_ptr->Child());
}
return true;
}
Expand All @@ -161,7 +160,7 @@ bool CatalogSet::CreateEntry(ClientContext &context, const string &name, unique_
}

optional_ptr<CatalogEntry> CatalogSet::GetEntryInternal(CatalogTransaction transaction, EntryIndex &entry_index) {
auto &catalog_entry = *entry_index.GetEntry();
auto &catalog_entry = entry_index.GetEntry();
// if it does: we have to retrieve the entry and to check version numbers
if (HasConflict(transaction, catalog_entry.timestamp)) {
// current version has been written to by a currently active
Expand Down Expand Up @@ -235,7 +234,7 @@ bool CatalogSet::AlterEntry(CatalogTransaction transaction, const string &name,
if (value->name != original_name) {
auto mapping_value = GetMapping(transaction, value->name);
if (mapping_value && !mapping_value->deleted) {
auto &original_entry = GetEntryForTransaction(transaction, *mapping_value->index.GetEntry());
auto &original_entry = GetEntryForTransaction(transaction, mapping_value->index.GetEntry());
if (!original_entry.deleted) {
entry->UndoAlter(context, alter_info);
string rename_err_msg =
Expand Down Expand Up @@ -267,7 +266,7 @@ bool CatalogSet::AlterEntry(CatalogTransaction transaction, const string &name,
// push the old entry in the undo buffer for this transaction
if (transaction.transaction) {
auto &dtransaction = transaction.transaction->Cast<DuckTransaction>();
dtransaction.PushCatalogEntry(*new_entry->child, stream.GetData(), stream.GetPosition());
dtransaction.PushCatalogEntry(new_entry->Child(), stream.GetData(), stream.GetPosition());
}

// Check the dependency manager to verify that there are no conflicting dependencies with this alter
Expand All @@ -287,7 +286,7 @@ void CatalogSet::DropEntryDependencies(CatalogTransaction transaction, EntryInde
EntryDropper dropper(entry_index);

// To correctly delete the object and its dependencies, it temporarily is set to deleted.
entry_index.GetEntry()->deleted = true;
entry_index.GetEntry().deleted = true;

// check any dependencies of this object
D_ASSERT(entry.ParentCatalog().IsDuckCatalog());
Expand Down Expand Up @@ -315,7 +314,7 @@ void CatalogSet::DropEntryInternal(CatalogTransaction transaction, EntryIndex en
// push the old entry in the undo buffer for this transaction
if (transaction.transaction) {
auto &dtransaction = transaction.transaction->Cast<DuckTransaction>();
dtransaction.PushCatalogEntry(*value_ptr->child);
dtransaction.PushCatalogEntry(value_ptr->Child());
}
}

Expand Down Expand Up @@ -350,23 +349,27 @@ DuckCatalog &CatalogSet::GetCatalog() {

void CatalogSet::CleanupEntry(CatalogEntry &catalog_entry) {
// destroy the backed up entry: it is no longer required
D_ASSERT(catalog_entry.parent);

auto parent_p = catalog_entry.Parent();
D_ASSERT(parent_p);
lock_guard<mutex> write_lock(catalog.GetWriteLock());
lock_guard<mutex> lock(catalog_lock);
auto parent = catalog_entry.parent;
parent->child = std::move(catalog_entry.child);
if (parent->deleted && !parent->child && !parent->parent) {
auto mapping_entry = mapping.find(parent->name);
parent_p = catalog_entry.Parent();
auto &parent = *parent_p;
parent.SetChild(catalog_entry.TakeChild());
if (parent.deleted && !parent.HasChild() && !parent.HasParent()) {
auto mapping_entry = mapping.find(parent.name);
D_ASSERT(mapping_entry != mapping.end());
auto &entry = mapping_entry->second->index.GetEntry();
D_ASSERT(entry);
if (entry.get() == parent.get()) {
if (&entry == parent_p.get()) {
mapping.erase(mapping_entry);
}
}
}

catalog_entry_t CatalogSet::GenerateCatalogEntryIndex() {
return current_entry++;
}

bool CatalogSet::HasConflict(CatalogTransaction transaction, transaction_t timestamp) {
return (timestamp >= TRANSACTION_ID_START && timestamp != transaction.transaction_id) ||
(timestamp < TRANSACTION_ID_START && timestamp > transaction.start_time);
Expand Down Expand Up @@ -439,23 +442,23 @@ bool CatalogSet::UseTimestamp(CatalogTransaction transaction, transaction_t time

CatalogEntry &CatalogSet::GetEntryForTransaction(CatalogTransaction transaction, CatalogEntry &current) {
reference<CatalogEntry> entry(current);
while (entry.get().child) {
while (entry.get().HasChild()) {
if (UseTimestamp(transaction, entry.get().timestamp)) {
break;
}
entry = *entry.get().child;
entry = entry.get().Child();
}
return entry.get();
}

CatalogEntry &CatalogSet::GetCommittedEntry(CatalogEntry &current) {
reference<CatalogEntry> entry(current);
while (entry.get().child) {
while (entry.get().HasChild()) {
if (entry.get().timestamp < TRANSACTION_ID_START) {
// this entry is committed: use it
break;
}
entry = *entry.get().child;
entry = entry.get().Child();
}
return entry.get();
}
Expand Down Expand Up @@ -489,7 +492,7 @@ optional_ptr<CatalogEntry> CatalogSet::CreateEntryInternal(CatalogTransaction tr
entry->set = this;
entry->timestamp = 0;

auto entry_index = PutEntry(current_entry++, std::move(entry));
auto entry_index = PutEntry(GenerateCatalogEntryIndex(), std::move(entry));
PutMapping(transaction, name, std::move(entry_index));
mapping[name]->timestamp = 0;
return catalog_entry;
Expand Down Expand Up @@ -545,7 +548,7 @@ optional_ptr<CatalogEntry> CatalogSet::GetEntry(CatalogTransaction transaction,
// we found an entry for this name
// check the version numbers

auto &catalog_entry = *mapping_value->index.GetEntry();
auto &catalog_entry = mapping_value->index.GetEntry();
auto &current = GetEntryForTransaction(transaction, catalog_entry);
const bool entry_is_invalid = !IncludeEntry(include_deleted, current.deleted);
const bool entry_is_out_of_date = current.name != name && !UseTimestamp(transaction, mapping_value->timestamp);
Expand All @@ -572,7 +575,7 @@ void CatalogSet::Undo(CatalogEntry &entry) {
// and entry->parent has to be removed ("rolled back")

// i.e. we have to place (entry) as (entry->parent) again
auto &to_be_removed_node = *entry.parent;
auto &to_be_removed_node = *entry.Parent();

if (!StringUtil::CIEquals(entry.name, to_be_removed_node.name)) {
// rename: clean up the new name when the rename is rolled back
Expand All @@ -584,18 +587,15 @@ void CatalogSet::Undo(CatalogEntry &entry) {
mapping.erase(removed_entry);
}
}
if (to_be_removed_node.parent) {
if (to_be_removed_node.HasParent()) {
// if the to be removed node has a parent, set the child pointer to the
// to be restored node
auto preserved_child = std::move(to_be_removed_node.parent->child);
to_be_removed_node.parent->child = std::move(to_be_removed_node.child);
entry.parent = to_be_removed_node.parent;
to_be_removed_node.Parent()->SetChild(to_be_removed_node.TakeChild());
} else {
// otherwise we need to update the base entry tables
auto &name = entry.name;
to_be_removed_node.child->SetAsRoot();
mapping[name]->index.GetEntry() = std::move(to_be_removed_node.child);
entry.parent = nullptr;
to_be_removed_node.Child().SetAsRoot();
mapping[name]->index.SetEntry(to_be_removed_node.TakeChild());
}

// restore the name if it was deleted
Expand Down Expand Up @@ -642,7 +642,7 @@ void CatalogSet::Scan(CatalogTransaction transaction, const std::function<void(C
CreateDefaultEntries(transaction, lock);

for (auto &kv : entries) {
auto &entry = *kv.second.entry.get();
auto &entry = kv.second.Entry();
auto &entry_for_transaction = GetEntryForTransaction(transaction, entry);
if (!entry_for_transaction.deleted) {
callback(entry_for_transaction);
Expand All @@ -658,8 +658,8 @@ void CatalogSet::Scan(const std::function<void(CatalogEntry &)> &callback) {
// lock the catalog set
lock_guard<mutex> lock(catalog_lock);
for (auto &kv : entries) {
auto entry = kv.second.entry.get();
auto &commited_entry = GetCommittedEntry(*entry);
auto &entry = kv.second.Entry();
auto &commited_entry = GetCommittedEntry(entry);
if (!commited_entry.deleted) {
callback(commited_entry);
}
Expand Down
10 changes: 10 additions & 0 deletions src/include/duckdb/catalog/catalog_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class CatalogEntry {
bool internal;
//! Timestamp at which the catalog entry was created
atomic<transaction_t> timestamp;

private:
//! Child entry
unique_ptr<CatalogEntry> child;
//! Parent entry (the node that dependents_map this node)
Expand Down Expand Up @@ -77,6 +79,14 @@ class CatalogEntry {
void Serialize(Serializer &serializer) const;
static unique_ptr<CreateInfo> Deserialize(Deserializer &deserializer);

public:
void SetChild(unique_ptr<CatalogEntry> child);
unique_ptr<CatalogEntry> TakeChild();
bool HasChild() const;
bool HasParent() const;
CatalogEntry &Child();
optional_ptr<CatalogEntry> Parent();

public:
template <class TARGET>
TARGET &Cast() {
Expand Down
35 changes: 29 additions & 6 deletions src/include/duckdb/catalog/catalog_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ class DependencySetCatalogEntry;

typedef unordered_map<CatalogSet *, unique_lock<mutex>> set_lock_map_t;

using catalog_entry_t = idx_t;
struct EntryValue {
EntryValue() {
throw InternalException("EntryValue called without a catalog entry");
}

EntryValue() = delete;
explicit EntryValue(unique_ptr<CatalogEntry> entry_p) : entry(std::move(entry_p)), reference_count(0) {
}
//! enable move constructors
Expand All @@ -52,6 +50,30 @@ struct EntryValue {
Swap(other);
return *this;
}
template <bool UNSAFE = false>
CatalogEntry &Entry() {
if (UNSAFE) {
return *entry.get();
} else {
return *entry;
}
}
unique_ptr<CatalogEntry> TakeEntry() {
return std::move(entry);
}
void SetEntry(unique_ptr<CatalogEntry> entry_p) {
entry = std::move(entry_p);
}
void IncreaseRefCount() {
reference_count++;
}
bool DecreaseRefCount() {
D_ASSERT(reference_count != 0);
reference_count--;
return reference_count == 0;
}

private:
void Swap(EntryValue &other) {
std::swap(entry, other.entry);
idx_t count = reference_count;
Expand Down Expand Up @@ -127,6 +149,7 @@ class CatalogSet {
void Verify(Catalog &catalog);

private:
catalog_entry_t GenerateCatalogEntryIndex();
//! Given a root entry, gets the entry valid for this transaction
CatalogEntry &GetEntryForTransaction(CatalogTransaction transaction, CatalogEntry &current);
CatalogEntry &GetCommittedEntry(CatalogEntry &current);
Expand Down Expand Up @@ -157,11 +180,11 @@ class CatalogSet {
//! The catalog lock is used to make changes to the data
mutex catalog_lock;
//! The set of catalog entries
unordered_map<idx_t, EntryValue> entries;
unordered_map<catalog_entry_t, EntryValue> entries;
//! Mapping of string to catalog entry
case_insensitive_map_t<unique_ptr<MappingValue>> mapping;
//! The current catalog entry index
idx_t current_entry = 0;
catalog_entry_t current_entry = 0;
//! The generator used to generate default internal entries
unique_ptr<DefaultGenerator> defaults;
};
Expand Down
Loading

0 comments on commit 7e0e91c

Please sign in to comment.