Skip to content

Commit

Permalink
Merge pull request #259 from duckdb/attachschema
Browse files Browse the repository at this point in the history
Fix #221: add the SCHEMA parameter to ATTACH that allows you to specify which schema to load
  • Loading branch information
Mytherin authored Sep 4, 2024
2 parents ab0c42c + 33598a6 commit e7e52ec
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 25 deletions.
3 changes: 2 additions & 1 deletion src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class PostgresSchemaEntry;

class PostgresCatalog : public Catalog {
public:
explicit PostgresCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode);
explicit PostgresCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode, string schema_to_load);
~PostgresCatalog();

string path;
Expand Down Expand Up @@ -93,6 +93,7 @@ class PostgresCatalog : public Catalog {
PostgresVersion version;
PostgresSchemaSet schemas;
PostgresConnectionPool connection_pool;
string default_schema;
};

} // namespace duckdb
2 changes: 1 addition & 1 deletion src/include/storage/postgres_index_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class PostgresIndexSet : public PostgresInSchemaSet {
PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr<PostgresResultSlice> index_result = nullptr);

public:
static string GetInitializeQuery();
static string GetInitializeQuery(const string &schema = string());

optional_ptr<CatalogEntry> CreateIndex(ClientContext &context, CreateIndexInfo &info, TableCatalogEntry &table);

Expand Down
8 changes: 6 additions & 2 deletions src/include/storage/postgres_schema_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ struct CreateSchemaInfo;

class PostgresSchemaSet : public PostgresCatalogSet {
public:
explicit PostgresSchemaSet(Catalog &catalog);
explicit PostgresSchemaSet(Catalog &catalog, string schema_to_load);

public:
optional_ptr<CatalogEntry> CreateSchema(ClientContext &context, CreateSchemaInfo &info);

static string GetInitializeQuery();
static string GetInitializeQuery(const string &schema = string());

protected:
void LoadEntries(ClientContext &context) override;

protected:
//! Schema to load - if empty loads all schemas (default behavior)
string schema_to_load;
};

} // namespace duckdb
4 changes: 2 additions & 2 deletions src/include/storage/postgres_type_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class PostgresTypeSet : public PostgresInSchemaSet {
public:
optional_ptr<CatalogEntry> CreateType(ClientContext &context, CreateTypeInfo &info);

static string GetInitializeEnumsQuery(PostgresVersion version);
static string GetInitializeCompositesQuery();
static string GetInitializeEnumsQuery(PostgresVersion version, const string &schema = string());
static string GetInitializeCompositesQuery(const string &schema = string());

protected:
bool HasInternalDependencies() const override {
Expand Down
5 changes: 4 additions & 1 deletion src/postgres_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
string connection_string = info.path;

string secret_name;
string schema_to_load;
for (auto &entry : info.options) {
auto lower_name = StringUtil::Lower(entry.first);
if (lower_name == "type" || lower_name == "read_only") {
// already handled
} else if (lower_name == "secret") {
secret_name = entry.second.ToString();
} else if (lower_name == "schema") {
schema_to_load = entry.second.ToString();
} else {
throw BinderException("Unrecognized option for Postgres attach: %s", entry.first);
}
Expand Down Expand Up @@ -93,7 +96,7 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
// secret not found and one was explicitly provided - throw an error
throw BinderException("Secret with name \"%s\" not found", secret_name);
}
return make_uniq<PostgresCatalog>(db, connection_string, access_mode);
return make_uniq<PostgresCatalog>(db, connection_string, access_mode, std::move(schema_to_load));
}

static unique_ptr<TransactionManager> PostgresCreateTransactionManager(StorageExtensionInfo *storage_info,
Expand Down
11 changes: 8 additions & 3 deletions src/storage/postgres_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@

namespace duckdb {

PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode)
: Catalog(db_p), path(path), access_mode(access_mode), schemas(*this), connection_pool(*this) {
PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode,
string schema_to_load)
: Catalog(db_p), path(path), access_mode(access_mode), schemas(*this, schema_to_load), connection_pool(*this),
default_schema(schema_to_load) {
if (default_schema.empty()) {
default_schema = "public";
}
Value connection_limit;
auto &db_instance = db_p.GetDatabase();
if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) {
Expand Down Expand Up @@ -62,7 +67,7 @@ optional_ptr<SchemaCatalogEntry> PostgresCatalog::GetSchema(CatalogTransaction t
OnEntryNotFound if_not_found,
QueryErrorContext error_context) {
if (schema_name == DEFAULT_SCHEMA) {
return GetSchema(transaction, "public", if_not_found, error_context);
return GetSchema(transaction, default_schema, if_not_found, error_context);
}
auto &postgres_transaction = PostgresTransaction::Get(transaction.GetContext(), *this);
if (schema_name == "pg_temp") {
Expand Down
10 changes: 8 additions & 2 deletions src/storage/postgres_index_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ PostgresIndexSet::PostgresIndexSet(PostgresSchemaEntry &schema, unique_ptr<Postg
: PostgresInSchemaSet(schema, !index_result_p), index_result(std::move(index_result_p)) {
}

string PostgresIndexSet::GetInitializeQuery() {
return R"(
string PostgresIndexSet::GetInitializeQuery(const string &schema) {
string base_query = R"(
SELECT pg_namespace.oid, tablename, indexname
FROM pg_indexes
JOIN pg_namespace ON (schemaname=nspname)
${CONDITION}
ORDER BY pg_namespace.oid;
)";
string condition;
if (!schema.empty()) {
condition += "WHERE pg_namespace.nspname=" + KeywordHelper::WriteQuoted(schema);
}
return StringUtil::Replace(base_query, "${CONDITION}", condition);
}

void PostgresIndexSet::LoadEntries(ClientContext &context) {
Expand Down
23 changes: 15 additions & 8 deletions src/storage/postgres_schema_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

namespace duckdb {

PostgresSchemaSet::PostgresSchemaSet(Catalog &catalog) : PostgresCatalogSet(catalog, false) {
PostgresSchemaSet::PostgresSchemaSet(Catalog &catalog, string schema_to_load_p)
: PostgresCatalogSet(catalog, false), schema_to_load(std::move(schema_to_load_p)) {
}

vector<unique_ptr<PostgresResultSlice>> SliceResult(PostgresResult &schemas, unique_ptr<PostgresResult> to_slice_ptr) {
Expand All @@ -33,22 +34,28 @@ vector<unique_ptr<PostgresResultSlice>> SliceResult(PostgresResult &schemas, uni
return result;
}

string PostgresSchemaSet::GetInitializeQuery() {
return R"(
string PostgresSchemaSet::GetInitializeQuery(const string &schema) {
string base_query = R"(
SELECT oid, nspname
FROM pg_namespace
${CONDITION}
ORDER BY oid;
)";
string condition;
if (!schema.empty()) {
condition += "WHERE pg_namespace.nspname=" + KeywordHelper::WriteQuoted(schema);
}
return StringUtil::Replace(base_query, "${CONDITION}", condition);
}

void PostgresSchemaSet::LoadEntries(ClientContext &context) {
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
auto pg_version = pg_catalog.GetPostgresVersion();
string schema_query = PostgresSchemaSet::GetInitializeQuery();
string tables_query = PostgresTableSet::GetInitializeQuery();
string enum_types_query = PostgresTypeSet::GetInitializeEnumsQuery(pg_version);
string composite_types_query = PostgresTypeSet::GetInitializeCompositesQuery();
string index_query = PostgresIndexSet::GetInitializeQuery();
string schema_query = PostgresSchemaSet::GetInitializeQuery(schema_to_load);
string tables_query = PostgresTableSet::GetInitializeQuery(schema_to_load);
string enum_types_query = PostgresTypeSet::GetInitializeEnumsQuery(pg_version, schema_to_load);
string composite_types_query = PostgresTypeSet::GetInitializeCompositesQuery(schema_to_load);
string index_query = PostgresIndexSet::GetInitializeQuery(schema_to_load);

auto full_query = schema_query + tables_query + enum_types_query + composite_types_query + index_query;

Expand Down
20 changes: 16 additions & 4 deletions src/storage/postgres_type_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ PostgresTypeSet::PostgresTypeSet(PostgresSchemaEntry &schema, unique_ptr<Postgre
composite_type_result(std::move(composite_type_result_p)) {
}

string PostgresTypeSet::GetInitializeEnumsQuery(PostgresVersion version) {
string PostgresTypeSet::GetInitializeEnumsQuery(PostgresVersion version, const string &schema) {
if (version.major_v < 8 || (version.major_v == 8 && version.minor_v < 3)) {
// pg_enum support has been present since v8.3 - https://www.postgresql.org/docs/8.3/catalog-pg-enum.html
// for older postgres versions we don't support enums instead
Expand All @@ -29,13 +29,19 @@ SELECT 0 AS oid, 0 AS enumtypid, '' AS typname, '' AS enumlabel
LIMIT 0;
)";
}
return R"(
string base_query = R"(
SELECT n.oid, enumtypid, typname, enumlabel
FROM pg_enum e
JOIN pg_type t ON e.enumtypid = t.oid
JOIN pg_namespace AS n ON (typnamespace=n.oid)
${CONDITION}
ORDER BY n.oid, enumtypid, enumsortorder;
)";
string condition;
if (!schema.empty()) {
condition += "WHERE n.nspname=" + KeywordHelper::WriteQuoted(schema);
}
return StringUtil::Replace(base_query, "${CONDITION}", condition);
}

void PostgresTypeSet::CreateEnum(PostgresResult &result, idx_t start_row, idx_t end_row) {
Expand Down Expand Up @@ -75,8 +81,8 @@ void PostgresTypeSet::InitializeEnums(PostgresResultSlice &enums) {
}
}

string PostgresTypeSet::GetInitializeCompositesQuery() {
return R"(
string PostgresTypeSet::GetInitializeCompositesQuery(const string &schema) {
string base_query = R"(
SELECT n.oid, t.typrelid AS id, t.typname as type, pg_attribute.attname, sub_type.typname
FROM pg_type t
JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
Expand All @@ -85,8 +91,14 @@ JOIN pg_attribute ON attrelid=t.typrelid
JOIN pg_type sub_type ON (pg_attribute.atttypid=sub_type.oid)
WHERE pg_class.relkind = 'c'
AND t.typtype='c'
${CONDITION}
ORDER BY n.oid, t.oid, attrelid, attnum;
)";
string condition;
if (!schema.empty()) {
condition += "AND n.nspname=" + KeywordHelper::WriteQuoted(schema);
}
return StringUtil::Replace(base_query, "${CONDITION}", condition);
}

void PostgresTypeSet::CreateCompositeType(PostgresTransaction &transaction, PostgresResult &result, idx_t start_row,
Expand Down
2 changes: 1 addition & 1 deletion test/other.sql
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ CREATE TABLE my_table (
insert into my_table values (42, 'something', 'something else');


CREATE SCHEMA some_schema;
CREATE SCHEMA some_schema;

create type some_schema.some_enum as enum('one', 'two');

Expand Down
36 changes: 36 additions & 0 deletions test/sql/storage/attach_schema_param.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# name: test/sql/storage/attach_schema_param.test
# description: Test attaching only a specific schema
# group: [storage]

require postgres_scanner

require-env POSTGRES_TEST_DATABASE_AVAILABLE

statement ok
PRAGMA enable_verification

statement ok
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES, SCHEMA 'some_schema')

query I
SELECT * FROM s.some_schema.some_table
----
two

query I
SELECT * FROM s.some_table
----
two

statement error
SELECT * FROM s.public.my_table
----
does not exist

statement ok
USE s;

query I
SELECT * FROM some_table
----
two

0 comments on commit e7e52ec

Please sign in to comment.