Skip to content

Commit

Permalink
using logical catalog entries instead of physical ones in the depende…
Browse files Browse the repository at this point in the history
…ncy manager
  • Loading branch information
Tishj committed Oct 24, 2023
1 parent 812c4a9 commit fc4be69
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 73 deletions.
1 change: 1 addition & 0 deletions src/catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ add_library_unity(
catalog_set.cpp
catalog_transaction.cpp
duck_catalog.cpp
dependency.cpp
dependency_list.cpp
dependency_manager.cpp
similar_catalog_entry.cpp)
Expand Down
11 changes: 0 additions & 11 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,6 @@ struct CatalogLookup {
string schema;
};

//! Return value of Catalog::LookupEntry
struct CatalogEntryLookup {
optional_ptr<SchemaCatalogEntry> schema;
optional_ptr<CatalogEntry> entry;
PreservedError error;

DUCKDB_API bool Found() const {
return entry;
}
};

//===--------------------------------------------------------------------===//
// Generic
//===--------------------------------------------------------------------===//
Expand Down
97 changes: 94 additions & 3 deletions src/catalog/dependency_list.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "duckdb/catalog/dependency_list.hpp"
#include "duckdb/catalog/catalog_entry.hpp"
#include "duckdb/common/serializer/deserializer.hpp"
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
#include "duckdb/catalog/catalog.hpp"

namespace duckdb {
Expand All @@ -13,14 +16,102 @@ void DependencyList::AddDependency(CatalogEntry &entry) {

void DependencyList::VerifyDependencies(Catalog &catalog, const string &name) {
for (auto &dep_entry : set) {
auto &dep = dep_entry.get();
if (&dep.ParentCatalog() != &catalog) {
auto &dep = dep_entry;
if (dep.entry.catalog != catalog.GetName()) {
throw DependencyException(
"Error adding dependency for object \"%s\" - dependency \"%s\" is in catalog "
"\"%s\", which does not match the catalog \"%s\".\nCross catalog dependencies are not supported.",
name, dep.name, dep.ParentCatalog().GetName(), catalog.GetName());
name, dep.entry.name, dep.entry.catalog, catalog.GetName());
}
}
}

LogicalDependencyList DependencyList::GetLogical() const {
LogicalDependencyList result;
for (auto &entry : set) {
result.AddDependency(entry);
}
return result;
}

bool DependencyList::Contains(CatalogEntry &entry) {
return set.count(entry);
}

void LogicalDependencyList::AddDependency(CatalogEntry &entry) {
Dependency dependency(entry);
set.insert(std::move(dependency));
}

void LogicalDependencyList::AddDependency(const Dependency &entry) {
set.insert(entry);
}

bool LogicalDependencyList::Contains(CatalogEntry &entry_p) {
Dependency logical_entry(entry_p);
return set.count(logical_entry);
}

// DependencyList LogicalDependencyList::GetPhysical(ClientContext &context, Catalog &catalog) const {
// DependencyList dependencies;

// for (auto &entry : set) {
// auto &name = entry.name;
// // Don't use the serialized catalog name, could be attached with a different name
// auto &schema = entry.schema;
// auto &type = entry.type;

// CatalogEntryLookup lookup;
// if (type == CatalogType::SCHEMA_ENTRY) {
// auto lookup = catalog.GetSchema(context, name, OnEntryNotFound::THROW_EXCEPTION);
// D_ASSERT(lookup);
// dependencies.AddDependency(*lookup);
// } else {
// auto lookup = catalog.LookupEntry(context, type, schema, name, OnEntryNotFound::THROW_EXCEPTION);
// D_ASSERT(lookup.Found());
// auto catalog_entry = lookup.entry;
// dependencies.AddDependency(*catalog_entry);
// }
// }
// return dependencies;
//}

// void LogicalDependencyList::VerifyDependencies(Catalog &catalog, const string &name) {
// for (auto &dep : set) {
// if (dep.catalog != catalog.GetName()) {
// throw DependencyException(
// "Error adding dependency for object \"%s\" - dependency \"%s\" is in catalog "
// "\"%s\", which does not match the catalog \"%s\".\nCross catalog dependencies are not supported.",
// name, dep.name, dep.catalog, catalog.GetName());
// }
// }
//}

void LogicalDependencyList::Serialize(Serializer &serializer) const {
// serializer.WriteProperty(0, "logical_dependencies", set);
}

const LogicalDependencyList::create_info_set_t &LogicalDependencyList::Set() const {
return set;
}

LogicalDependencyList LogicalDependencyList::Deserialize(Deserializer &deserializer) {
LogicalDependencyList dependency;
// dependency.set = deserializer.ReadProperty<create_info_set_t>(0, "logical_dependencies");
return dependency;
}

bool LogicalDependencyList::operator==(const LogicalDependencyList &other) const {
if (set.size() != other.set.size()) {
return false;
}

for (auto &entry : set) {
if (!other.set.count(entry)) {
return false;
}
}
return true;
}

} // namespace duckdb
90 changes: 50 additions & 40 deletions src/catalog/dependency_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@ namespace duckdb {
DependencyManager::DependencyManager(DuckCatalog &catalog) : catalog(catalog) {
}

static CatalogEntryLookup LookupEntry(const Dependency &dep, Catalog &catalog, CatalogTransaction &transaction) {
auto &entry = dep.entry;
auto &name = entry.name;
auto &schema = entry.schema;
auto &type = entry.type;

if (entry.type == CatalogType::SCHEMA_ENTRY /*|| entry.type == CatalogType::DATABASE_ENTRY */) {
auto schema = catalog.GetSchema(transaction, name, OnEntryNotFound::THROW_EXCEPTION);
auto entry = optional_ptr<CatalogEntry>(schema.get());
return CatalogEntryLookup {nullptr, entry, PreservedError()};
} else {
return catalog.TryLookupEntryInternal(transaction, type, schema, name);
}
}

void DependencyManager::AddObject(CatalogTransaction transaction, CatalogEntry &object, DependencyList &dependencies) {
// check for each object in the sources if they were not deleted yet
for (auto &dep : dependencies.set) {
auto &dependency = dep.get();
if (&dependency.ParentCatalog() != &object.ParentCatalog()) {
throw DependencyException(
"Error adding dependency for object \"%s\" - dependency \"%s\" is in catalog "
"\"%s\", which does not match the catalog \"%s\".\nCross catalog dependencies are not supported.",
object.name, dependency.name, dependency.ParentCatalog().GetName(), object.ParentCatalog().GetName());
}
if (!dependency.set) {
throw InternalException("Dependency has no set");
}
auto catalog_entry = dependency.set->GetEntryInternal(transaction, dependency.name, nullptr);
if (!catalog_entry) {
auto catalog_entry = LookupEntry(dep, catalog, transaction);
if (!catalog_entry.entry) {
throw InternalException("Dependency has already been deleted?");
}
}
Expand All @@ -46,28 +51,31 @@ void DependencyManager::AddObject(CatalogTransaction transaction, CatalogEntry &
}

void DependencyManager::DropObject(CatalogTransaction transaction, CatalogEntry &object, bool cascade) {
D_ASSERT(dependents_map.find(object) != dependents_map.end());
auto dep_entry = Dependency(object);
D_ASSERT(dependents_map.find(dep_entry) != dependents_map.end());

// first check the objects that depend on this object
auto &dependent_objects = dependents_map[object];
auto &dependent_objects = dependents_map.at(object);
for (auto &dep : dependent_objects) {
// look up the entry in the catalog set
auto &entry = dep.entry.get();
auto &catalog_set = *entry.set;
auto mapping_value = catalog_set.GetMapping(transaction, entry.name, true /* get_latest */);
if (mapping_value == nullptr) {
auto lookup = LookupEntry(dep, catalog, transaction);
D_ASSERT(!lookup.error);
if (!lookup.entry) {
// the dependent object was already deleted, no conflict
continue;
}
auto dependency_entry = catalog_set.GetEntryInternal(transaction, mapping_value->index);
if (!dependency_entry) {
// the dependent object was already deleted, no conflict
auto &dependency_entry = *lookup.entry;
D_ASSERT(dependency_entry.set);
auto &catalog_set = *dependency_entry.set;
auto mapping_value = catalog_set.GetMapping(transaction, dep.entry.name, /* get_latest = */ true);
if (mapping_value == nullptr) {
continue;
}
// conflict: attempting to delete this object but the dependent object still exists
if (cascade || dep.dependency_type == DependencyType::DEPENDENCY_AUTOMATIC ||
dep.dependency_type == DependencyType::DEPENDENCY_OWNS) {
// cascade: drop the dependent object
catalog_set.DropEntryInternal(transaction, mapping_value->index.Copy(), *dependency_entry, cascade);
catalog_set.DropEntryInternal(transaction, mapping_value->index.Copy(), dependency_entry, cascade);
} else {
// no cascade and there are objects that depend on this object: throw error
throw DependencyException("Cannot drop entry \"%s\" because there are entries that "
Expand All @@ -82,20 +90,19 @@ void DependencyManager::AlterObject(CatalogTransaction transaction, CatalogEntry
D_ASSERT(dependencies_map.find(old_obj) != dependencies_map.end());

// first check the objects that depend on this object
catalog_entry_vector_t owned_objects_to_add;
auto &dependent_objects = dependents_map[old_obj];
dependency_set_t owned_objects_to_add;
auto &dependent_objects = dependents_map.at(old_obj);
for (auto &dep : dependent_objects) {
// look up the entry in the catalog set
auto &entry = dep.entry.get();
auto &catalog_set = *entry.set;
auto dependency_entry = catalog_set.GetEntryInternal(transaction, entry.name, nullptr);
if (!dependency_entry) {
auto lookup = LookupEntry(dep, catalog, transaction);
D_ASSERT(!lookup.error);
if (!lookup.entry) {
// the dependent object was already deleted, no conflict
continue;
}
if (dep.dependency_type == DependencyType::DEPENDENCY_OWNS) {
// the dependent object is owned by the current object
owned_objects_to_add.push_back(dep.entry);
owned_objects_to_add.insert(dep);
continue;
}
// conflict: attempting to alter this object but the dependent object still exists
Expand All @@ -107,7 +114,7 @@ void DependencyManager::AlterObject(CatalogTransaction transaction, CatalogEntry
// add the new object to the dependents_map of each object that it depends on
auto &old_dependencies = dependencies_map[old_obj];
for (auto &dep : old_dependencies) {
auto &dependency = dep.get();
auto &dependency = dep;
dependents_map[dependency].insert(new_obj);
}

Expand All @@ -117,7 +124,7 @@ void DependencyManager::AlterObject(CatalogTransaction transaction, CatalogEntry
dependencies_map[new_obj] = old_dependencies;

for (auto &dependency : owned_objects_to_add) {
dependents_map[new_obj].insert(Dependency(dependency, DependencyType::DEPENDENCY_OWNS));
dependents_map[new_obj].insert(dependency);
dependents_map[dependency].insert(Dependency(new_obj, DependencyType::DEPENDENCY_OWNED_BY));
dependencies_map[new_obj].insert(dependency);
}
Expand Down Expand Up @@ -152,31 +159,34 @@ void DependencyManager::Scan(const std::function<void(CatalogEntry &, CatalogEnt
lock_guard<mutex> write_lock(catalog.GetWriteLock());
for (auto &entry : dependents_map) {
for (auto &dependent : entry.second) {
callback(entry.first, dependent.entry, dependent.dependency_type);
// FIXME: don't have a CatalogTransaction here, can't do a lookup to get the entry??
throw NotImplementedException("FIXME");
// callback(entry.first, dependent.entry, dependent.dependency_type);
}
}
}

void DependencyManager::AddOwnership(CatalogTransaction transaction, CatalogEntry &owner, CatalogEntry &entry) {
void DependencyManager::AddOwnership(CatalogTransaction transaction, CatalogEntry &owner_p, CatalogEntry &entry) {
// lock the catalog for writing
lock_guard<mutex> write_lock(catalog.GetWriteLock());

// If the owner is already owned by something else, throw an error
for (auto &dep : dependents_map[owner]) {
for (auto &dep : dependents_map[owner_p]) {
if (dep.dependency_type == DependencyType::DEPENDENCY_OWNED_BY) {
throw DependencyException(owner.name + " already owned by " + dep.entry.get().name);
throw DependencyException(owner_p.name + " already owned by " + dep.entry.name);
}
}

// If the entry is already owned, throw an error
auto owner = Dependency(owner_p);
for (auto &dep : dependents_map[entry]) {
// if the entry is already owned, throw error
if (&dep.entry.get() != &owner) {
throw DependencyException(entry.name + " already depends on " + dep.entry.get().name);
if (dep.entry != owner.entry) {
throw DependencyException(entry.name + " already depends on " + dep.entry.name);
}
// if the entry owns the owner, throw error
if (&dep.entry.get() == &owner && dep.dependency_type == DependencyType::DEPENDENCY_OWNS) {
throw DependencyException(entry.name + " already owns " + owner.name +
if (dep.entry == owner.entry && dep.dependency_type == DependencyType::DEPENDENCY_OWNS) {
throw DependencyException(entry.name + " already owns " + owner.entry.name +
". Cannot have circular dependencies");
}
}
Expand All @@ -185,7 +195,7 @@ void DependencyManager::AddOwnership(CatalogTransaction transaction, CatalogEntr
// In the case AddOwnership is called twice, because of emplace, the object will not be repeated in the set.
// We use an automatic dependency because if the Owner gets deleted, then the owned objects are also deleted
dependents_map[owner].emplace(entry, DependencyType::DEPENDENCY_OWNS);
dependents_map[entry].emplace(owner, DependencyType::DEPENDENCY_OWNED_BY);
dependents_map[entry].emplace(owner.entry, DependencyType::DEPENDENCY_OWNED_BY);
dependencies_map[owner].emplace(entry);
}

Expand Down
19 changes: 16 additions & 3 deletions src/include/duckdb/catalog/catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "duckdb/common/atomic.hpp"
#include "duckdb/common/optional_ptr.hpp"
#include "duckdb/common/enums/on_entry_not_found.hpp"
#include "duckdb/common/preserved_error.hpp"
#include <functional>

namespace duckdb {
Expand Down Expand Up @@ -67,6 +68,17 @@ class LogicalDelete;
class LogicalUpdate;
class CreateStatement;

//! Return value of Catalog::LookupEntry
struct CatalogEntryLookup {
optional_ptr<SchemaCatalogEntry> schema;
optional_ptr<CatalogEntry> entry;
PreservedError error;

DUCKDB_API bool Found() const {
return entry;
}
};

//! The Catalog object represents the catalog of the database.
class Catalog {
public:
Expand Down Expand Up @@ -309,14 +321,15 @@ class Catalog {
static bool AutoLoadExtensionByCatalogEntry(ClientContext &context, CatalogType type, const string &entry_name);
DUCKDB_API static bool TryAutoLoad(ClientContext &context, const string &extension_name) noexcept;

//! Lookup an entry in the schema, returning a lookup with the entry and schema if they exist
CatalogEntryLookup TryLookupEntryInternal(CatalogTransaction transaction, CatalogType type, const string &schema,
const string &name);

protected:
//! Reference to the database
AttachedDatabase &db;

private:
//! Lookup an entry in the schema, returning a lookup with the entry and schema if they exist
CatalogEntryLookup TryLookupEntryInternal(CatalogTransaction transaction, CatalogType type, const string &schema,
const string &name);
//! Calls LookupEntryInternal on the schema, trying other schemas if the schema is invalid. Sets
//! CatalogEntryLookup->error depending on if_not_found when no entry is found
CatalogEntryLookup TryLookupEntry(ClientContext &context, CatalogType type, const string &schema,
Expand Down
Loading

0 comments on commit fc4be69

Please sign in to comment.