diff --git a/.github/workflows/Rest.yml b/.github/workflows/Rest.yml index df09d8c..5f11464 100644 --- a/.github/workflows/Rest.yml +++ b/.github/workflows/Rest.yml @@ -50,3 +50,9 @@ jobs: working-directory: scripts/ run: | ./start-rest-catalog.sh + + - name: Test With rest catalog + env: + ICEBERG_SERVER_AVAILABLE: 1 + run: | + make test_release \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index fe79363..37fb76f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,12 +13,24 @@ include_directories(src/include) set(EXTENSION_SOURCES src/iceberg_extension.cpp src/iceberg_functions.cpp + src/catalog_api.cpp + src/catalog_utils.cpp src/common/utils.cpp src/common/schema.cpp src/common/iceberg.cpp src/iceberg_functions/iceberg_snapshots.cpp src/iceberg_functions/iceberg_scan.cpp - src/iceberg_functions/iceberg_metadata.cpp) + src/iceberg_functions/iceberg_metadata.cpp + src/storage/irc_catalog.cpp + src/storage/irc_catalog_set.cpp + src/storage/irc_clear_cache.cpp + src/storage/irc_schema_entry.cpp + src/storage/irc_schema_set.cpp + src/storage/irc_table_entry.cpp + src/storage/irc_table_set.cpp + src/storage/irc_transaction.cpp + src/storage/irc_transaction_manager.cpp +) add_library(${EXTENSION_NAME} STATIC ${EXTENSION_SOURCES}) @@ -73,10 +85,12 @@ target_link_libraries( Snappy::snappy ZLIB::ZLIB) +find_package(CURL REQUIRED) + # Link dependencies into extension -target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release +target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release CURL::libcurl debug avro_static_debug) -target_link_libraries(${TARGET_NAME}_loadable_extension optimized +target_link_libraries(${TARGET_NAME}_loadable_extension optimized CURL::libcurl avro_static_release debug avro_static_debug) install( diff --git a/scripts/provision.py b/scripts/provision.py index a71883b..064e55b 100644 --- a/scripts/provision.py +++ b/scripts/provision.py @@ -151,3 +151,10 @@ (CAST('2023-03-12' AS date), 12, 'l'); """ ) + +spark.sql( + """ + Delete from default.table_mor_deletes + where number > 3 and number < 10; + """ +) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 67f2796..7c579ec 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -1 +1 @@ -pyspark==3.4.1 \ No newline at end of file +pyspark==3.5.0 \ No newline at end of file diff --git a/src/catalog_api.cpp b/src/catalog_api.cpp new file mode 100644 index 0000000..48b7243 --- /dev/null +++ b/src/catalog_api.cpp @@ -0,0 +1,460 @@ +#include "catalog_api.hpp" +#include "catalog_utils.hpp" +#include "storage/irc_catalog.hpp" +#include "yyjson.hpp" + +#include +#include +#include + +using namespace duckdb_yyjson; +namespace duckdb { + +//! We use a global here to store the path that is selected on the ICAPI::InitializeCurl call +static string SELECTED_CURL_CERT_PATH = ""; + +static size_t RequestWriteCallback(void *contents, size_t size, size_t nmemb, void *userp) { + ((std::string *)userp)->append((char *)contents, size * nmemb); + return size * nmemb; +} + +// we statically compile in libcurl, which means the cert file location of the build machine is the +// place curl will look. But not every distro has this file in the same location, so we search a +// number of common locations and use the first one we find. +static string certFileLocations[] = { + // Arch, Debian-based, Gentoo + "/etc/ssl/certs/ca-certificates.crt", + // RedHat 7 based + "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", + // Redhat 6 based + "/etc/pki/tls/certs/ca-bundle.crt", + // OpenSUSE + "/etc/ssl/ca-bundle.pem", + // Alpine + "/etc/ssl/cert.pem" +}; + +const string IRCAPI::API_VERSION_1 = "v1"; + +struct YyjsonDocDeleter { + void operator()(yyjson_doc* doc) { + yyjson_doc_free(doc); + } + void operator()(yyjson_mut_doc* doc) { + yyjson_mut_doc_free(doc); + } +}; + +// Look through the the above locations and if one of the files exists, set that as the location curl should use. +static bool SelectCurlCertPath() { + for (string& caFile : certFileLocations) { + struct stat buf; + if (stat(caFile.c_str(), &buf) == 0) { + SELECTED_CURL_CERT_PATH = caFile; + } + } + return false; +} + +static bool SetCurlCAFileInfo(CURL* curl) { + if (!SELECTED_CURL_CERT_PATH.empty()) { + curl_easy_setopt(curl, CURLOPT_CAINFO, SELECTED_CURL_CERT_PATH.c_str()); + return true; + } + return false; +} + +// Note: every curl object we use should set this, because without it some linux distro's may not find the CA certificate. +static void InitializeCurlObject(CURL * curl, const string &token) { + if (!token.empty()) { + curl_easy_setopt(curl, CURLOPT_XOAUTH2_BEARER, token.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BEARER); + } + SetCurlCAFileInfo(curl); +} + +template +static TYPE TemplatedTryGetYYJson(yyjson_val *obj, const string &field, TYPE default_val, + bool fail_on_missing = true) { + auto val = yyjson_obj_get(obj, field.c_str()); + if (val && yyjson_get_type(val) == TYPE_NUM) { + return get_function(val); + } else if (!fail_on_missing) { + return default_val; + } + throw IOException("Invalid field found while parsing field: " + field); +} + +static uint64_t TryGetNumFromObject(yyjson_val *obj, const string &field, bool fail_on_missing = true, + uint64_t default_val = 0) { + return TemplatedTryGetYYJson(obj, field, default_val, + fail_on_missing); +} +static bool TryGetBoolFromObject(yyjson_val *obj, const string &field, bool fail_on_missing = false, + bool default_val = false) { + return TemplatedTryGetYYJson(obj, field, default_val, + fail_on_missing); +} +static string TryGetStrFromObject(yyjson_val *obj, const string &field, bool fail_on_missing = true, + const char *default_val = "") { + return TemplatedTryGetYYJson(obj, field, default_val, + fail_on_missing); +} + +static string DeleteRequest(const string &url, const string &token = "", curl_slist *extra_headers = NULL) { + CURL *curl; + CURLcode res; + string readBuffer; + + curl = curl_easy_init(); + if (curl) { + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RequestWriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + if(extra_headers) { + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, extra_headers); + } + + InitializeCurlObject(curl, token); + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + + if (res != CURLcode::CURLE_OK) { + string error = curl_easy_strerror(res); + throw IOException("Curl DELETE Request to '%s' failed with error: '%s'", url, error); + } + + return readBuffer; + } + throw InternalException("Failed to initialize curl"); +} + +static string GetRequest(const string &url, const string &token = "", curl_slist *extra_headers = NULL) { + CURL *curl; + CURLcode res; + string readBuffer; + + curl = curl_easy_init(); + if (curl) { + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RequestWriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + if(extra_headers) { + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, extra_headers); + } + + InitializeCurlObject(curl, token); + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + + if (res != CURLcode::CURLE_OK) { + string error = curl_easy_strerror(res); + throw IOException("Curl Request to '%s' failed with error: '%s'", url, error); + } + + return readBuffer; + } + throw InternalException("Failed to initialize curl"); +} + +static string PostRequest( + const string &url, + const string &post_data, + const string &content_type = "x-www-form-urlencoded", + const string &token = "", + curl_slist *extra_headers = NULL) { + string readBuffer; + CURL *curl = curl_easy_init(); + if (!curl) { + throw InternalException("Failed to initialize curl"); + } + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post_data.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RequestWriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + // Create default headers for content type + struct curl_slist *headers = NULL; + const string content_type_str = "Content-Type: application/" + content_type; + headers = curl_slist_append(headers, content_type_str.c_str()); + + // Append any extra headers + if (extra_headers) { + struct curl_slist *temp = extra_headers; + while (temp) { + headers = curl_slist_append(headers, temp->data); + temp = temp->next; + } + } + + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + InitializeCurlObject(curl, token); + + // Perform the request + CURLcode res = curl_easy_perform(curl); + + // Clean up + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + if (res != CURLcode::CURLE_OK) { + string error = curl_easy_strerror(res); + throw IOException("Curl Request to '%s' failed with error: '%s'", url, error); + } + return readBuffer; +} + +static yyjson_doc *api_result_to_doc(const string &api_result) { + auto *doc = yyjson_read(api_result.c_str(), api_result.size(), 0); + auto *root = yyjson_doc_get_root(doc); + auto *error = yyjson_obj_get(root, "error"); + if (error != NULL) { + string err_msg = TryGetStrFromObject(error, "message"); + throw std::runtime_error(err_msg); + } + return doc; +} + +static string GetTableMetadata(const string &internal, const string &schema, const string &table, IRCCredentials credentials) { + struct curl_slist *extra_headers = NULL; + extra_headers = curl_slist_append(extra_headers, "X-Iceberg-Access-Delegation: vended-credentials"); + string api_result = GetRequest( + credentials.endpoint + IRCAPI::GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces/" + schema + "/tables/" + table, + credentials.token, + extra_headers); + curl_slist_free_all(extra_headers); + return api_result; +} + +void IRCAPI::InitializeCurl() { + SelectCurlCertPath(); +} + +vector IRCAPI::GetCatalogs(const string &catalog, IRCCredentials credentials) { + throw NotImplementedException("ICAPI::GetCatalogs"); +} + +static IRCAPIColumnDefinition ParseColumnDefinition(yyjson_val *column_def) { + IRCAPIColumnDefinition result; + result.name = TryGetStrFromObject(column_def, "name"); + result.type_text = TryGetStrFromObject(column_def, "type"); + result.precision = (result.type_text == "decimal") ? TryGetNumFromObject(column_def, "type_precision") : -1; + result.scale = (result.type_text == "decimal") ? TryGetNumFromObject(column_def, "type_scale") : -1; + result.position = TryGetNumFromObject(column_def, "id") - 1; + return result; +} + +IRCAPITableCredentials IRCAPI::GetTableCredentials(const string &internal, const string &schema, const string &table, IRCCredentials credentials) { + IRCAPITableCredentials result; + string api_result = GetTableMetadata(internal, schema, table, credentials); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + auto *aws_temp_credentials = yyjson_obj_get(root, "config"); + auto credential_size = yyjson_obj_size(aws_temp_credentials); + if (aws_temp_credentials && credential_size > 0) { + result.key_id = TryGetStrFromObject(aws_temp_credentials, "s3.access-key-id", false); + result.secret = TryGetStrFromObject(aws_temp_credentials, "s3.secret-access-key", false); + result.session_token = TryGetStrFromObject(aws_temp_credentials, "s3.session-token", false); + } + return result; +} + +string IRCAPI::GetToken(string id, string secret, string endpoint) { + string post_data = "grant_type=client_credentials&client_id=" + id + "&client_secret=" + secret + "&scope=PRINCIPAL_ROLE:ALL"; + string api_result = PostRequest(endpoint + "/v1/oauth/tokens", post_data); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + return TryGetStrFromObject(root, "access_token"); +} + +static void populateTableMetadata(IRCAPITable &table, yyjson_val *metadata_root) { + table.storage_location = TryGetStrFromObject(metadata_root, "metadata-location"); + auto *metadata = yyjson_obj_get(metadata_root, "metadata"); + //table_result.table_id = TryGetStrFromObject(metadata, "table-uuid"); + + uint64_t current_schema_id = TryGetNumFromObject(metadata, "current-schema-id"); + auto *schemas = yyjson_obj_get(metadata, "schemas"); + yyjson_val *schema; + size_t schema_idx, schema_max; + bool found = false; + yyjson_arr_foreach(schemas, schema_idx, schema_max, schema) { + uint64_t schema_id = TryGetNumFromObject(schema, "schema-id"); + if (schema_id == current_schema_id) { + found = true; + auto *columns = yyjson_obj_get(schema, "fields"); + yyjson_val *col; + size_t col_idx, col_max; + yyjson_arr_foreach(columns, col_idx, col_max, col) { + auto column_definition = ParseColumnDefinition(col); + table.columns.push_back(column_definition); + } + } + } + + if (!found) { + throw InternalException("Current schema not found"); + } +} + +static IRCAPITable createTable(const string &catalog, const string &schema, const string &table_name) { + IRCAPITable table_result; + table_result.catalog_name = catalog; + table_result.schema_name = schema; + table_result.name = table_name; + table_result.data_source_format = "ICEBERG"; + table_result.table_id = "uuid-" + schema + "-" + "table"; + std::replace(table_result.table_id.begin(), table_result.table_id.end(), '_', '-'); + return table_result; +} + +IRCAPITable IRCAPI::GetTable( + const string &catalog, const string &internal, const string &schema, const string &table_name, optional_ptr credentials) { + + IRCAPITable table_result = createTable(catalog, schema, table_name); + if (credentials) { + string result = GetTableMetadata(internal, schema, table_result.name, *credentials); + std::unique_ptr doc(api_result_to_doc(result)); + auto *metadata_root = yyjson_doc_get_root(doc.get()); + populateTableMetadata(table_result, metadata_root); + } else { + // Skip fetching metadata, we'll do it later when we access the table + IRCAPIColumnDefinition col; + col.name = "__"; + col.type_text = "int"; + col.precision = -1; + col.scale = -1; + col.position = 0; + table_result.columns.push_back(col); + } + + return table_result; +} + +string IRCAPI::GetOptionallyPrefixedURL(const string &api_version, const string &prefix) { + D_ASSERT((int32_t)api_version.find(std::string("/")) < 0 && (int32_t)prefix.find(std::string("/")) < 0); + if (prefix.empty()) { + return "/" + api_version + "/"; + } + return "/" + api_version + "/" + prefix + "/"; +} + +// TODO: handle out-of-order columns using position property +vector IRCAPI::GetTables(const string &catalog, const string &internal, const string &schema, IRCCredentials credentials) { + vector result; + string api_result = GetRequest(credentials.endpoint + GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces/" + schema + "/tables", credentials.token); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + auto *tables = yyjson_obj_get(root, "identifiers"); + size_t idx, max; + yyjson_val *table; + yyjson_arr_foreach(tables, idx, max, table) { + auto table_result = GetTable(catalog, internal, schema, TryGetStrFromObject(table, "name"), nullptr); + result.push_back(table_result); + } + + return result; +} + +vector IRCAPI::GetSchemas(const string &catalog, const string &internal, IRCCredentials credentials) { + vector result; + string api_result = + GetRequest(credentials.endpoint + GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces", credentials.token); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + auto *schemas = yyjson_obj_get(root, "namespaces"); + size_t idx, max; + yyjson_val *schema; + yyjson_arr_foreach(schemas, idx, max, schema) { + IRCAPISchema schema_result; + schema_result.catalog_name = catalog; + yyjson_val *value = yyjson_arr_get(schema, 0); + schema_result.schema_name = yyjson_get_str(value); + result.push_back(schema_result); + } + + return result; +} + +IRCAPISchema IRCAPI::CreateSchema(const string &catalog, const string &internal, const string &schema, IRCCredentials credentials) { + string post_data = "{\"namespace\":[\"" + schema + "\"]}"; + string api_result = PostRequest( + credentials.endpoint + GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces", post_data, "json", credentials.token); + api_result_to_doc(api_result); // if the method returns, request was successful + + IRCAPISchema schema_result; + schema_result.catalog_name = catalog; + schema_result.schema_name = schema; //yyjson_get_str(value); + return schema_result; +} + +void IRCAPI::DropSchema(const string &internal, const string &schema, IRCCredentials credentials) { + string api_result = DeleteRequest( + credentials.endpoint + GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces/" + schema, credentials.token); + api_result_to_doc(api_result); // if the method returns, request was successful +} + +void IRCAPI::DropTable(const string &catalog, const string &internal, const string &schema, string &table_name, IRCCredentials credentials) { + string api_result = DeleteRequest( + credentials.endpoint + GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces/" + schema + "/tables/" + table_name + "?purgeRequested=true", + credentials.token); + api_result_to_doc(api_result); // if the method returns, request was successful +} + +static std::string json_to_string(yyjson_mut_doc *doc, yyjson_write_flag flags = YYJSON_WRITE_PRETTY) { + char *json_chars = yyjson_mut_write(doc, flags, NULL); + std::string json_str(json_chars); + free(json_chars); + return json_str; +} + +IRCAPITable IRCAPI::CreateTable(const string &catalog, const string &internal, const string &schema, IRCCredentials credentials, CreateTableInfo *table_info) { + std::unique_ptr dd(yyjson_mut_doc_new(NULL)); + yyjson_mut_val *rr = yyjson_mut_obj(dd.get()); + yyjson_mut_doc_set_root(dd.get(), rr); + yyjson_mut_obj_add_str(dd.get(), rr, "name", table_info->table.c_str()); + + yyjson_mut_val *sch = yyjson_mut_obj(dd.get()); + yyjson_mut_obj_add_val(dd.get(), rr, "schema", sch); + yyjson_mut_obj_add_str(dd.get(), sch, "type", "struct"); + + yyjson_mut_val *fields = yyjson_mut_arr(dd.get()); + yyjson_mut_obj_add_val(dd.get(), sch, "fields", fields); + + std::vector column_names; + std::vector column_types; + for (auto &col : table_info->columns.Logical()) { + // Store column name and type in vectors + column_names.push_back(col.GetName()); + column_types.push_back(ICUtils::LogicalToIcebergType(col.GetType())); + // Add column object to JSON + yyjson_mut_val *col_obj = yyjson_mut_obj(dd.get()); + yyjson_mut_obj_add_int(dd.get(), col_obj, "id", col.Oid()); + yyjson_mut_obj_add_bool(dd.get(), col_obj, "required", true); + yyjson_mut_obj_add_str(dd.get(), col_obj, "name", column_names.back().c_str()); + yyjson_mut_obj_add_str(dd.get(), col_obj, "type", column_types.back().c_str()); + yyjson_mut_arr_add_val(fields, col_obj); + } + + yyjson_mut_val *props = yyjson_mut_obj(dd.get()); + yyjson_mut_obj_add_val(dd.get(), rr, "properties", props); + yyjson_mut_obj_add_str(dd.get(), props, "write.parquet.compression-codec", "snappy"); + + IRCAPITable table_result = createTable(catalog, schema, table_info->table); + string post_data = json_to_string(dd.get()); + struct curl_slist *extra_headers = NULL; + extra_headers = curl_slist_append(extra_headers, "X-Iceberg-Access-Delegation: vended-credentials"); + string api_result = PostRequest( + credentials.endpoint + GetOptionallyPrefixedURL(IRCAPI::API_VERSION_1, internal) + "namespaces/" + schema + "/tables", post_data, "json", credentials.token, extra_headers); + curl_slist_free_all(extra_headers); + std::unique_ptr doc(api_result_to_doc(api_result)); + auto *root = yyjson_doc_get_root(doc.get()); + populateTableMetadata(table_result, root); + return table_result; +} + +} // namespace duckdb diff --git a/src/catalog_utils.cpp b/src/catalog_utils.cpp new file mode 100644 index 0000000..23328d0 --- /dev/null +++ b/src/catalog_utils.cpp @@ -0,0 +1,236 @@ +#include "catalog_utils.hpp" +#include "duckdb/common/operator/cast_operators.hpp" +#include "storage/irc_schema_entry.hpp" +#include "storage/irc_transaction.hpp" + +#include + +namespace duckdb { + +string ICUtils::LogicalToIcebergType(const LogicalType &input) { + switch (input.id()) { + case LogicalType::TINYINT: + case LogicalType::UTINYINT: + return "tinyint"; + case LogicalType::SMALLINT: + case LogicalType::USMALLINT: + return "smallint"; + case LogicalType::INTEGER: + case LogicalType::UINTEGER: + return "int"; + case LogicalType::BIGINT: + case LogicalType::UBIGINT: + return "long"; + case LogicalType::VARCHAR: + return "string"; + case LogicalType::DOUBLE: + return "double"; + case LogicalType::FLOAT: + return "float"; + case LogicalType::BOOLEAN: + return "boolean"; + case LogicalType::TIMESTAMP: + return "timestamp"; + case LogicalType::TIMESTAMP_TZ: + return "timestamptz"; + case LogicalType::BLOB: + return "binary"; + case LogicalType::DATE: + return "date"; + case LogicalTypeId::DECIMAL: { + uint8_t precision = DecimalType::GetWidth(input); + uint8_t scale = DecimalType::GetScale(input); + return "decimal(" + std::to_string(precision) + ", " + std::to_string(scale) + ")"; + } + // case LogicalTypeId::ARRAY: + // case LogicalTypeId::STRUCT: + // case LogicalTypeId::MAP: + default: + break; + } + + throw std::runtime_error("Unsupported type: " + input.ToString()); +} + +string ICUtils::TypeToString(const LogicalType &input) { + switch (input.id()) { + case LogicalType::VARCHAR: + return "TEXT"; + case LogicalType::UTINYINT: + return "TINYINT UNSIGNED"; + case LogicalType::USMALLINT: + return "SMALLINT UNSIGNED"; + case LogicalType::UINTEGER: + return "INTEGER UNSIGNED"; + case LogicalType::UBIGINT: + return "BIGINT UNSIGNED"; + case LogicalType::TIMESTAMP: + return "DATETIME"; + case LogicalType::TIMESTAMP_TZ: + return "TIMESTAMP"; + default: + return input.ToString(); + } +} + +LogicalType ICUtils::TypeToLogicalType(ClientContext &context, const string &type_text) { + if (type_text == "tinyint") { + return LogicalType::TINYINT; + } else if (type_text == "smallint") { + return LogicalType::SMALLINT; + } else if (type_text == "bigint") { + return LogicalType::BIGINT; + } else if (type_text == "int") { + return LogicalType::INTEGER; + } else if (type_text == "long") { + return LogicalType::BIGINT; + } else if (type_text == "string") { + return LogicalType::VARCHAR; + } else if (type_text == "double") { + return LogicalType::DOUBLE; + } else if (type_text == "float") { + return LogicalType::FLOAT; + } else if (type_text == "boolean") { + return LogicalType::BOOLEAN; + } else if (type_text == "timestamp") { + return LogicalType::TIMESTAMP; + } else if (type_text == "timestamptz") { + return LogicalType::TIMESTAMP_TZ; + } else if (type_text == "binary") { + return LogicalType::BLOB; + } else if (type_text == "date") { + return LogicalType::DATE; + } else if (type_text.find("decimal(") == 0) { + size_t spec_end = type_text.find(')'); + if (spec_end != string::npos) { + size_t sep = type_text.find(','); + auto prec_str = type_text.substr(8, sep - 8); + auto scale_str = type_text.substr(sep + 1, spec_end - sep - 1); + uint8_t prec = Cast::Operation(prec_str); + uint8_t scale = Cast::Operation(scale_str); + return LogicalType::DECIMAL(prec, scale); + } + } else if (type_text.find("array<") == 0) { + size_t type_end = type_text.rfind('>'); // find last, to deal with nested + if (type_end != string::npos) { + auto child_type_str = type_text.substr(6, type_end - 6); + auto child_type = ICUtils::TypeToLogicalType(context, child_type_str); + return LogicalType::LIST(child_type); + } + } else if (type_text.find("map<") == 0) { + size_t type_end = type_text.rfind('>'); // find last, to deal with nested + if (type_end != string::npos) { + // TODO: Factor this and struct parsing into an iterator over ',' separated values + vector key_val; + size_t cur = 4; + auto nested_opens = 0; + for (;;) { + size_t next_sep = cur; + // find the location of the next ',' ignoring nested commas + while (type_text[next_sep] != ',' || nested_opens > 0) { + if (type_text[next_sep] == '<') { + nested_opens++; + } else if (type_text[next_sep] == '>') { + nested_opens--; + } + next_sep++; + if (next_sep == type_end) { + break; + } + } + auto child_str = type_text.substr(cur, next_sep - cur); + auto child_type = ICUtils::TypeToLogicalType(context, child_str); + key_val.push_back(child_type); + if (next_sep == type_end) { + break; + } + cur = next_sep + 1; + } + if (key_val.size() != 2) { + throw NotImplementedException("Invalid map specification with %i types", key_val.size()); + } + return LogicalType::MAP(key_val[0], key_val[1]); + } + } else if (type_text.find("struct<") == 0) { + size_t type_end = type_text.rfind('>'); // find last, to deal with nested + if (type_end != string::npos) { + child_list_t children; + size_t cur = 7; + auto nested_opens = 0; + for (;;) { + size_t next_sep = cur; + // find the location of the next ',' ignoring nested commas + while (type_text[next_sep] != ',' || nested_opens > 0) { + if (type_text[next_sep] == '<') { + nested_opens++; + } else if (type_text[next_sep] == '>') { + nested_opens--; + } + next_sep++; + if (next_sep == type_end) { + break; + } + } + auto child_str = type_text.substr(cur, next_sep - cur); + size_t type_sep = child_str.find(':'); + if (type_sep == string::npos) { + throw NotImplementedException("Invalid struct child type specifier: %s", child_str); + } + auto child_name = child_str.substr(0, type_sep); + auto child_type = ICUtils::TypeToLogicalType(context, child_str.substr(type_sep + 1, string::npos)); + children.push_back({child_name, child_type}); + if (next_sep == type_end) { + break; + } + cur = next_sep + 1; + } + return LogicalType::STRUCT(children); + } + } + + throw NotImplementedException("Tried to fallback to unknown type for '%s'", type_text); + // fallback for unknown types + return LogicalType::VARCHAR; +} + +LogicalType ICUtils::ToICType(const LogicalType &input) { + // todo do we need this mapping? + throw NotImplementedException("ToUCType not yet implemented"); + switch (input.id()) { + case LogicalTypeId::BOOLEAN: + case LogicalTypeId::SMALLINT: + case LogicalTypeId::INTEGER: + case LogicalTypeId::BIGINT: + case LogicalTypeId::TINYINT: + case LogicalTypeId::UTINYINT: + case LogicalTypeId::USMALLINT: + case LogicalTypeId::UINTEGER: + case LogicalTypeId::UBIGINT: + case LogicalTypeId::FLOAT: + case LogicalTypeId::DOUBLE: + case LogicalTypeId::BLOB: + case LogicalTypeId::DATE: + case LogicalTypeId::DECIMAL: + case LogicalTypeId::TIMESTAMP: + case LogicalTypeId::TIMESTAMP_TZ: + case LogicalTypeId::VARCHAR: + return input; + case LogicalTypeId::LIST: + throw NotImplementedException("PC does not support arrays - unsupported type \"%s\"", input.ToString()); + case LogicalTypeId::STRUCT: + case LogicalTypeId::MAP: + case LogicalTypeId::UNION: + throw NotImplementedException("PC does not support composite types - unsupported type \"%s\"", + input.ToString()); + case LogicalTypeId::TIMESTAMP_SEC: + case LogicalTypeId::TIMESTAMP_MS: + case LogicalTypeId::TIMESTAMP_NS: + return LogicalType::TIMESTAMP; + case LogicalTypeId::HUGEINT: + return LogicalType::DOUBLE; + default: + return LogicalType::VARCHAR; + } +} + +} // namespace duckdb diff --git a/src/iceberg_extension.cpp b/src/iceberg_extension.cpp index f82c6b4..7dc2f2e 100644 --- a/src/iceberg_extension.cpp +++ b/src/iceberg_extension.cpp @@ -1,20 +1,153 @@ #define DUCKDB_EXTENSION_MAIN #include "iceberg_extension.hpp" +#include "storage/irc_catalog.hpp" +#include "storage/irc_transaction_manager.hpp" #include "duckdb.hpp" +#include "duckdb/main/secret/secret_manager.hpp" #include "duckdb/common/exception.hpp" #include "duckdb/common/string_util.hpp" #include "duckdb/function/scalar_function.hpp" +#include "duckdb/main/extension_util.hpp" #include "duckdb/catalog/catalog_entry/macro_catalog_entry.hpp" #include "duckdb/catalog/default/default_functions.hpp" +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" +#include "duckdb/parser/parsed_data/attach_info.hpp" +#include "duckdb/storage/storage_extension.hpp" #include "iceberg_functions.hpp" #include "yyjson.hpp" #include "duckdb/main/extension_util.hpp" #include +#include "catalog_api.hpp" namespace duckdb { +static unique_ptr CreateCatalogSecretFunction(ClientContext &, CreateSecretInput &input) { + // apply any overridden settings + vector prefix_paths; + auto result = make_uniq(prefix_paths, "iceberg", "config", input.name); + + for (const auto &named_param : input.options) { + auto lower_name = StringUtil::Lower(named_param.first); + + if (lower_name == "client_id" || + lower_name == "client_secret" || + lower_name == "endpoint" || + lower_name == "aws_region") { + result->secret_map[lower_name] = named_param.second.ToString(); + } else { + throw InternalException("Unknown named parameter passed to CreateIRCSecretFunction: " + lower_name); + } + } + + // Get token from catalog + result->secret_map["token"] = IRCAPI::GetToken( + result->secret_map["client_id"].ToString(), + result->secret_map["client_secret"].ToString(), + result->secret_map["endpoint"].ToString()); + + //! Set redact keys + result->redact_keys = {"token", "client_id", "client_secret"}; + + return std::move(result); +} + +static void SetCatalogSecretParameters(CreateSecretFunction &function) { + function.named_parameters["client_id"] = LogicalType::VARCHAR; + function.named_parameters["client_secret"] = LogicalType::VARCHAR; + function.named_parameters["endpoint"] = LogicalType::VARCHAR; + function.named_parameters["aws_region"] = LogicalType::VARCHAR; + function.named_parameters["token"] = LogicalType::VARCHAR; +} + +unique_ptr GetSecret(ClientContext &context, const string &secret_name) { + auto &secret_manager = SecretManager::Get(context); + auto transaction = CatalogTransaction::GetSystemCatalogTransaction(context); + // FIXME: this should be adjusted once the `GetSecretByName` API supports this + // use case + auto secret_entry = secret_manager.GetSecretByName(transaction, secret_name, "memory"); + if (secret_entry) { + return secret_entry; + } + secret_entry = secret_manager.GetSecretByName(transaction, secret_name, "local_file"); + if (secret_entry) { + return secret_entry; + } + return nullptr; +} + +static unique_ptr IcebergCatalogAttach(StorageExtensionInfo *storage_info, ClientContext &context, + AttachedDatabase &db, const string &name, AttachInfo &info, + AccessMode access_mode) { + IRCCredentials credentials; + + // check if we have a secret provided + string secret_name; + for (auto &entry : info.options) { + auto lower_name = StringUtil::Lower(entry.first); + if (lower_name == "type" || lower_name == "read_only") { + // already handled + } else if (lower_name == "secret") { + secret_name = entry.second.ToString(); + } else { + throw BinderException("Unrecognized option for PC attach: %s", entry.first); + } + } + + // if no secret is specified we default to the unnamed mysql secret, if it + // exists + bool explicit_secret = !secret_name.empty(); + if (!explicit_secret) { + // look up settings from the default unnamed mysql secret if none is + // provided + secret_name = "__default_iceberg"; + } + + string connection_string = info.path; + auto secret_entry = GetSecret(context, secret_name); + if (secret_entry) { + // secret found - read data + const auto &kv_secret = dynamic_cast(*secret_entry->secret); + string new_connection_info; + + Value token_val = kv_secret.TryGetValue("token"); + if (token_val.IsNull()) { + throw std::runtime_error("Token is blank"); + } + credentials.token = token_val.ToString(); + + Value endpoint_val = kv_secret.TryGetValue("endpoint"); + credentials.endpoint = endpoint_val.IsNull() ? "" : endpoint_val.ToString(); + StringUtil::RTrim(credentials.endpoint, "/"); + + Value aws_region_val = kv_secret.TryGetValue("aws_region"); + credentials.aws_region = endpoint_val.IsNull() ? "" : aws_region_val.ToString(); + + } else if (explicit_secret) { + // secret not found and one was explicitly provided - throw an error + throw BinderException("Secret with name \"%s\" not found", secret_name); + } + + // TODO: Check catalog with name actually exists! + + return make_uniq(db, info.path, access_mode, credentials); +} + +static unique_ptr CreateTransactionManager(StorageExtensionInfo *storage_info, AttachedDatabase &db, + Catalog &catalog) { + auto &ic_catalog = catalog.Cast(); + return make_uniq(db, ic_catalog); +} + +class ICCatalogStorageExtension : public StorageExtension { +public: + ICCatalogStorageExtension() { + attach = IcebergCatalogAttach; + create_transaction_manager = CreateTransactionManager; + } +}; + static void LoadInternal(DatabaseInstance &instance) { auto &config = DBConfig::GetConfig(instance); @@ -34,6 +167,20 @@ static void LoadInternal(DatabaseInstance &instance) { for (auto &fun : IcebergFunctions::GetScalarFunctions()) { ExtensionUtil::RegisterFunction(instance, fun); } + + IRCAPI::InitializeCurl(); + + SecretType secret_type; + secret_type.name = "iceberg"; + secret_type.deserializer = KeyValueSecret::Deserialize; + secret_type.default_provider = "config"; + + ExtensionUtil::RegisterSecretType(instance, secret_type); + CreateSecretFunction secret_function = {"iceberg", "config", CreateCatalogSecretFunction}; + SetCatalogSecretParameters(secret_function); + ExtensionUtil::RegisterFunction(instance, secret_function); + + config.storage_extensions["iceberg"] = make_uniq(); } void IcebergExtension::Load(DuckDB &db) { diff --git a/src/include/catalog_api.hpp b/src/include/catalog_api.hpp new file mode 100644 index 0000000..290c55d --- /dev/null +++ b/src/include/catalog_api.hpp @@ -0,0 +1,65 @@ + +#pragma once + +#include "duckdb/common/types.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +namespace duckdb { +struct IRCCredentials; + +struct IRCAPIColumnDefinition { + string name; + string type_text; + idx_t precision; + idx_t scale; + idx_t position; +}; + +struct IRCAPITable { + string table_id; + + string name; + string catalog_name; + string schema_name; + string table_type; + string data_source_format; + string storage_location; + + vector columns; +}; + +struct IRCAPISchema { + string schema_name; + string catalog_name; +}; + +struct IRCAPITableCredentials { + string key_id; + string secret; + string session_token; +}; + +class IRCAPI { +public: + static const string API_VERSION_1; + + //! WARNING: not thread-safe. To be called once on extension initialization + static void InitializeCurl(); + + // The {prefix} for a catalog is always optional according to the iceberg spec. So no need to + // add it if it is not defined. + static string GetOptionallyPrefixedURL(const string &api_version, const string &prefix); + static IRCAPITableCredentials GetTableCredentials(const string &internal, const string &schema, const string &table, IRCCredentials credentials); + static vector GetCatalogs(const string &catalog, IRCCredentials credentials); + static vector GetTables(const string &catalog, const string &internal, const string &schema, IRCCredentials credentials); + static IRCAPITable GetTable(const string &catalog, const string &internal, const string &schema, const string &table_name, optional_ptr credentials); + static vector GetSchemas(const string &catalog, const string &internal, IRCCredentials credentials); + static vector GetTablesInSchema(const string &catalog, const string &schema, IRCCredentials credentials); + static string GetToken(string id, string secret, string endpoint); + static IRCAPISchema CreateSchema(const string &catalog, const string &internal, const string &schema, IRCCredentials credentials); + static void DropSchema(const string &internal, const string &schema, IRCCredentials credentials); + static IRCAPITable CreateTable(const string &catalog, const string &internal, const string &schema, IRCCredentials credentials, CreateTableInfo *table_info); + static void DropTable(const string &catalog, const string &internal, const string &schema, string &table_name, IRCCredentials credentials); +}; + +} // namespace duckdb diff --git a/src/include/catalog_utils.hpp b/src/include/catalog_utils.hpp new file mode 100644 index 0000000..b1569e5 --- /dev/null +++ b/src/include/catalog_utils.hpp @@ -0,0 +1,27 @@ + +#pragma once + +#include "duckdb.hpp" +#include "catalog_api.hpp" + +namespace duckdb { +class ICSchemaEntry; +class ICTransaction; + +enum class ICTypeAnnotation { STANDARD, CAST_TO_VARCHAR, NUMERIC_AS_DOUBLE, CTID, JSONB, FIXED_LENGTH_CHAR }; + +struct ICType { + idx_t oid = 0; + ICTypeAnnotation info = ICTypeAnnotation::STANDARD; + vector children; +}; + +class ICUtils { +public: + static LogicalType ToICType(const LogicalType &input); + static LogicalType TypeToLogicalType(ClientContext &context, const string &columnDefinition); + static string TypeToString(const LogicalType &input); + static string LogicalToIcebergType(const LogicalType &input); +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_catalog.hpp b/src/include/storage/irc_catalog.hpp new file mode 100644 index 0000000..2295a66 --- /dev/null +++ b/src/include/storage/irc_catalog.hpp @@ -0,0 +1,81 @@ + +#pragma once + +#include "duckdb/catalog/catalog.hpp" +#include "duckdb/function/table_function.hpp" +#include "duckdb/common/enums/access_mode.hpp" +#include "storage/irc_schema_set.hpp" + +namespace duckdb { +class ICSchemaEntry; + +struct IRCCredentials { + string endpoint; + string client_id; + string client_secret; + // required to query s3 tables + string aws_region; + // Catalog generates the token using client id & secret + string token; +}; + +class ICRClearCacheFunction : public TableFunction { +public: + ICRClearCacheFunction(); + + static void ClearCacheOnSetting(ClientContext &context, SetScope scope, Value ¶meter); +}; + + +class IRCatalog : public Catalog { +public: + explicit IRCatalog(AttachedDatabase &db_p, const string &internal_name, AccessMode access_mode, + IRCCredentials credentials); + ~IRCatalog(); + + string internal_name; + AccessMode access_mode; + IRCCredentials credentials; + +public: + void Initialize(bool load_builtin) override; + string GetCatalogType() override { + return "iceberg"; + } + + optional_ptr CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override; + + void ScanSchemas(ClientContext &context, std::function callback) override; + + optional_ptr GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, + QueryErrorContext error_context = QueryErrorContext()) override; + + unique_ptr PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) override; + unique_ptr PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) override; + unique_ptr PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) override; + unique_ptr PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) override; + unique_ptr BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) override; + + DatabaseSize GetDatabaseSize(ClientContext &context) override; + + //! Whether or not this is an in-memory PC database + bool InMemory() override; + string GetDBPath() override; + + void ClearCache(); + +private: + void DropSchema(ClientContext &context, DropInfo &info) override; + +private: + ICSchemaSet schemas; + string default_schema; +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_catalog_set.hpp b/src/include/storage/irc_catalog_set.hpp new file mode 100644 index 0000000..927c6a2 --- /dev/null +++ b/src/include/storage/irc_catalog_set.hpp @@ -0,0 +1,37 @@ + +#pragma once + +#include "duckdb/transaction/transaction.hpp" +#include "duckdb/common/case_insensitive_map.hpp" +#include "duckdb/common/mutex.hpp" + +namespace duckdb { +struct DropInfo; +class ICSchemaEntry; +class ICTransaction; + +class ICCatalogSet { +public: + ICCatalogSet(Catalog &catalog); + + optional_ptr GetEntry(ClientContext &context, const string &name); + virtual void DropEntry(ClientContext &context, DropInfo &info); + void Scan(ClientContext &context, const std::function &callback); + virtual optional_ptr CreateEntry(unique_ptr entry); + void ClearEntries(); + +protected: + virtual void LoadEntries(ClientContext &context) = 0; + virtual void FillEntry(ClientContext &context, unique_ptr &entry) = 0; + + void EraseEntryInternal(const string &name); + +protected: + Catalog &catalog; + case_insensitive_map_t> entries; + +private: + mutex entry_lock; +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_schema_entry.hpp b/src/include/storage/irc_schema_entry.hpp new file mode 100644 index 0000000..ebfa297 --- /dev/null +++ b/src/include/storage/irc_schema_entry.hpp @@ -0,0 +1,46 @@ + +#pragma once + +#include "catalog_api.hpp" +#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" +#include "storage/irc_table_set.hpp" + +namespace duckdb { +class ICTransaction; + +class ICSchemaEntry : public SchemaCatalogEntry { +public: + ICSchemaEntry(Catalog &catalog, CreateSchemaInfo &info); + ~ICSchemaEntry() override; + + unique_ptr schema_data; + +public: + optional_ptr CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override; + optional_ptr CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override; + optional_ptr CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) override; + optional_ptr CreateView(CatalogTransaction transaction, CreateViewInfo &info) override; + optional_ptr CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) override; + optional_ptr CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) override; + optional_ptr CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) override; + optional_ptr CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) override; + optional_ptr CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) override; + optional_ptr CreateType(CatalogTransaction transaction, CreateTypeInfo &info) override; + void Alter(CatalogTransaction transaction, AlterInfo &info) override; + void Scan(ClientContext &context, CatalogType type, const std::function &callback) override; + void Scan(CatalogType type, const std::function &callback) override; + void DropEntry(ClientContext &context, DropInfo &info) override; + optional_ptr GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override; + +private: + ICCatalogSet &GetCatalogSet(CatalogType type); + +private: + ICTableSet tables; +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_schema_set.hpp b/src/include/storage/irc_schema_set.hpp new file mode 100644 index 0000000..8302c76 --- /dev/null +++ b/src/include/storage/irc_schema_set.hpp @@ -0,0 +1,23 @@ + +#pragma once + +#include "storage/irc_catalog_set.hpp" +#include "storage/irc_schema_entry.hpp" + +namespace duckdb { +struct CreateSchemaInfo; + +class ICSchemaSet : public ICCatalogSet { +public: + explicit ICSchemaSet(Catalog &catalog); + +public: + optional_ptr CreateSchema(ClientContext &context, CreateSchemaInfo &info); + void DropSchema(ClientContext &context, DropInfo &info); + +protected: + void LoadEntries(ClientContext &context) override; + void FillEntry(ClientContext &context, unique_ptr &entry) override; +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_table_entry.hpp b/src/include/storage/irc_table_entry.hpp new file mode 100644 index 0000000..695404c --- /dev/null +++ b/src/include/storage/irc_table_entry.hpp @@ -0,0 +1,46 @@ + +#pragma once + +#include "catalog_api.hpp" +#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +namespace duckdb { + +struct ICTableInfo { + ICTableInfo() { + create_info = make_uniq(); + } + ICTableInfo(const string &schema, const string &table) { + create_info = make_uniq(string(), schema, table); + } + ICTableInfo(const SchemaCatalogEntry &schema, const string &table) { + create_info = make_uniq((SchemaCatalogEntry &)schema, table); + } + + const string &GetTableName() const { + return create_info->table; + } + + unique_ptr create_info; +}; + +class ICTableEntry : public TableCatalogEntry { +public: + ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info); + ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, ICTableInfo &info); + + unique_ptr table_data; + +public: + unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; + + TableFunction GetScanFunction(ClientContext &context, unique_ptr &bind_data) override; + + TableStorageInfo GetStorageInfo(ClientContext &context) override; + + void BindUpdateConstraints(Binder &binder, LogicalGet &get, LogicalProjection &proj, LogicalUpdate &update, + ClientContext &context) override; +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_table_set.hpp b/src/include/storage/irc_table_set.hpp new file mode 100644 index 0000000..2404cc4 --- /dev/null +++ b/src/include/storage/irc_table_set.hpp @@ -0,0 +1,51 @@ + +#pragma once + +#include "storage/irc_catalog_set.hpp" +#include "storage/irc_table_entry.hpp" + +namespace duckdb { +struct CreateTableInfo; +class ICResult; +class ICSchemaEntry; + + +class ICInSchemaSet : public ICCatalogSet { +public: + ICInSchemaSet(ICSchemaEntry &schema); + + optional_ptr CreateEntry(unique_ptr entry) override; + +protected: + ICSchemaEntry &schema; +}; + + +class ICTableSet : public ICInSchemaSet { +public: + explicit ICTableSet(ICSchemaEntry &schema); + +public: + optional_ptr CreateTable(ClientContext &context, BoundCreateTableInfo &info); + static unique_ptr GetTableInfo(ClientContext &context, ICSchemaEntry &schema, const string &table_name); + optional_ptr RefreshTable(ClientContext &context, const string &table_name); + void AlterTable(ClientContext &context, AlterTableInfo &info); + void DropTable(ClientContext &context, DropInfo &info); + +protected: + void LoadEntries(ClientContext &context) override; + void FillEntry(ClientContext &context, unique_ptr &entry) override; + + void AlterTable(ClientContext &context, RenameTableInfo &info); + void AlterTable(ClientContext &context, RenameColumnInfo &info); + void AlterTable(ClientContext &context, AddColumnInfo &info); + void AlterTable(ClientContext &context, RemoveColumnInfo &info); + + static void AddColumn(ClientContext &context, ICResult &result, ICTableInfo &table_info, idx_t column_offset = 0); + +private: + unique_ptr _CreateCatalogEntry(ClientContext &context, IRCAPITable table); +}; + + +} // namespace duckdb diff --git a/src/include/storage/irc_transaction.hpp b/src/include/storage/irc_transaction.hpp new file mode 100644 index 0000000..43ab54b --- /dev/null +++ b/src/include/storage/irc_transaction.hpp @@ -0,0 +1,32 @@ + +#pragma once + +#include "duckdb/transaction/transaction.hpp" + +namespace duckdb { +class IRCatalog; +class ICSchemaEntry; +class ICTableEntry; + +enum class ICTransactionState { TRANSACTION_NOT_YET_STARTED, TRANSACTION_STARTED, TRANSACTION_FINISHED }; + +class ICTransaction : public Transaction { +public: + ICTransaction(IRCatalog &ic_catalog, TransactionManager &manager, ClientContext &context); + ~ICTransaction() override; + + void Start(); + void Commit(); + void Rollback(); + + static ICTransaction &Get(ClientContext &context, Catalog &catalog); + AccessMode GetAccessMode() const { + return access_mode; + } + +private: + ICTransactionState transaction_state; + AccessMode access_mode; +}; + +} // namespace duckdb diff --git a/src/include/storage/irc_transaction_manager.hpp b/src/include/storage/irc_transaction_manager.hpp new file mode 100644 index 0000000..4d217fc --- /dev/null +++ b/src/include/storage/irc_transaction_manager.hpp @@ -0,0 +1,26 @@ + +#pragma once + +#include "duckdb/transaction/transaction_manager.hpp" +#include "storage/irc_catalog.hpp" +#include "storage/irc_transaction.hpp" + +namespace duckdb { + +class ICTransactionManager : public TransactionManager { +public: + ICTransactionManager(AttachedDatabase &db_p, IRCatalog &ic_catalog); + + Transaction &StartTransaction(ClientContext &context) override; + ErrorData CommitTransaction(ClientContext &context, Transaction &transaction) override; + void RollbackTransaction(Transaction &transaction) override; + + void Checkpoint(ClientContext &context, bool force = false) override; + +private: + IRCatalog &ic_catalog; + mutex transaction_lock; + reference_map_t> transactions; +}; + +} // namespace duckdb diff --git a/src/storage/irc_catalog.cpp b/src/storage/irc_catalog.cpp new file mode 100644 index 0000000..4151880 --- /dev/null +++ b/src/storage/irc_catalog.cpp @@ -0,0 +1,100 @@ +#include "storage/irc_catalog.hpp" +#include "storage/irc_schema_entry.hpp" +#include "storage/irc_transaction.hpp" +#include "duckdb/storage/database_size.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/main/attached_database.hpp" + +namespace duckdb { + +IRCatalog::IRCatalog(AttachedDatabase &db_p, const string &internal_name, AccessMode access_mode, + IRCCredentials credentials) + : Catalog(db_p), internal_name(internal_name), access_mode(access_mode), credentials(std::move(credentials)), + schemas(*this) { +} + +IRCatalog::~IRCatalog() = default; + +void IRCatalog::Initialize(bool load_builtin) { +} + +optional_ptr IRCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { + if (info.on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT) { + DropInfo try_drop; + try_drop.type = CatalogType::SCHEMA_ENTRY; + try_drop.name = info.schema; + try_drop.if_not_found = OnEntryNotFound::RETURN_NULL; + try_drop.cascade = false; + schemas.DropSchema(transaction.GetContext(), try_drop); + } + return schemas.CreateSchema(transaction.GetContext(), info); +} + +void IRCatalog::DropSchema(ClientContext &context, DropInfo &info) { + return schemas.DropSchema(context, info); +} + +void IRCatalog::ScanSchemas(ClientContext &context, std::function callback) { + schemas.Scan(context, [&](CatalogEntry &schema) { callback(schema.Cast()); }); +} + +optional_ptr IRCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, QueryErrorContext error_context) { + if (schema_name == DEFAULT_SCHEMA) { + if (default_schema.empty()) { + throw InvalidInputException("Attempting to fetch the default schema - but no database was " + "provided in the connection string"); + } + return GetSchema(transaction, default_schema, if_not_found, error_context); + } + auto entry = schemas.GetEntry(transaction.GetContext(), schema_name); + if (!entry && if_not_found != OnEntryNotFound::RETURN_NULL) { + throw BinderException("Schema with name \"%s\" not found", schema_name); + } + return reinterpret_cast(entry.get()); +} + +bool IRCatalog::InMemory() { + return false; +} + +string IRCatalog::GetDBPath() { + return internal_name; +} + +DatabaseSize IRCatalog::GetDatabaseSize(ClientContext &context) { + if (default_schema.empty()) { + throw InvalidInputException("Attempting to fetch the database size - but no database was provided " + "in the connection string"); + } + DatabaseSize size; + return size; +} + +void IRCatalog::ClearCache() { + schemas.ClearEntries(); +} + +unique_ptr IRCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanInsert"); +} +unique_ptr IRCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanCreateTableAs"); +} +unique_ptr IRCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanDelete"); +} +unique_ptr IRCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) { + throw NotImplementedException("ICCatalog PlanUpdate"); +} +unique_ptr IRCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) { + throw NotImplementedException("ICCatalog BindCreateIndex"); +} + +} // namespace duckdb diff --git a/src/storage/irc_catalog_set.cpp b/src/storage/irc_catalog_set.cpp new file mode 100644 index 0000000..0cc1af8 --- /dev/null +++ b/src/storage/irc_catalog_set.cpp @@ -0,0 +1,64 @@ +#include "storage/irc_catalog_set.hpp" +#include "storage/irc_transaction.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "storage/irc_schema_entry.hpp" + +namespace duckdb { + +ICCatalogSet::ICCatalogSet(Catalog &catalog) : catalog(catalog) { +} + +optional_ptr ICCatalogSet::GetEntry(ClientContext &context, const string &name) { + LoadEntries(context); + lock_guard l(entry_lock); + auto entry = entries.find(name); + if (entry == entries.end()) { + return nullptr; + } + FillEntry(context, entry->second); + return entry->second.get(); +} + +void ICCatalogSet::DropEntry(ClientContext &context, DropInfo &info) { + EraseEntryInternal(info.name); +} + +void ICCatalogSet::EraseEntryInternal(const string &name) { + lock_guard l(entry_lock); + entries.erase(name); +} + +void ICCatalogSet::Scan(ClientContext &context, const std::function &callback) { + LoadEntries(context); + + lock_guard l(entry_lock); + for (auto &entry : entries) { + callback(*entry.second); + } +} + +optional_ptr ICCatalogSet::CreateEntry(unique_ptr entry) { + lock_guard l(entry_lock); + auto result = entry.get(); + if (result->name.empty()) { + throw InternalException("ICCatalogSet::CreateEntry called with empty name"); + } + entries.insert(make_pair(result->name, std::move(entry))); + return result; +} + +void ICCatalogSet::ClearEntries() { + entries.clear(); +} + +ICInSchemaSet::ICInSchemaSet(ICSchemaEntry &schema) : ICCatalogSet(schema.ParentCatalog()), schema(schema) { +} + +optional_ptr ICInSchemaSet::CreateEntry(unique_ptr entry) { + if (!entry->internal) { + entry->internal = schema.internal; + } + return ICCatalogSet::CreateEntry(std::move(entry)); +} + +} // namespace duckdb diff --git a/src/storage/irc_clear_cache.cpp b/src/storage/irc_clear_cache.cpp new file mode 100644 index 0000000..b44386c --- /dev/null +++ b/src/storage/irc_clear_cache.cpp @@ -0,0 +1,50 @@ +#include "duckdb.hpp" + +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" +#include "duckdb/main/database_manager.hpp" +#include "duckdb/main/attached_database.hpp" +#include "storage/irc_catalog.hpp" + +namespace duckdb { + +struct ClearCacheFunctionData : public TableFunctionData { + bool finished = false; +}; + +static unique_ptr ClearCacheBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + + auto result = make_uniq(); + return_types.push_back(LogicalType::BOOLEAN); + names.emplace_back("Success"); + return std::move(result); +} + +static void ClearIRCCaches(ClientContext &context) { + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = db_ref.get(); + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "iceberg") { + continue; + } + catalog.Cast().ClearCache(); + } +} + +static void ClearCacheFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { + auto &data = data_p.bind_data->CastNoConst(); + if (data.finished) { + return; + } + ClearIRCCaches(context); + data.finished = true; +} + +void ICRClearCacheFunction::ClearCacheOnSetting(ClientContext &context, SetScope scope, Value ¶meter) { + ClearIRCCaches(context); +} + +ICRClearCacheFunction::ICRClearCacheFunction() : TableFunction("pc_clear_cache", {}, ClearCacheFunction, ClearCacheBind) { +} +} // namespace duckdb diff --git a/src/storage/irc_schema_entry.cpp b/src/storage/irc_schema_entry.cpp new file mode 100644 index 0000000..66172ff --- /dev/null +++ b/src/storage/irc_schema_entry.cpp @@ -0,0 +1,165 @@ +#include "storage/irc_schema_entry.hpp" +#include "storage/irc_table_entry.hpp" +#include "storage/irc_transaction.hpp" +#include "duckdb/parser/parsed_data/create_view_info.hpp" +#include "duckdb/parser/parsed_data/create_index_info.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/constraints/list.hpp" +#include "duckdb/common/unordered_set.hpp" +#include "duckdb/parser/parsed_data/alter_info.hpp" +#include "duckdb/parser/parsed_data/alter_table_info.hpp" +#include "duckdb/parser/parsed_expression_iterator.hpp" + +namespace duckdb { + +ICSchemaEntry::ICSchemaEntry(Catalog &catalog, CreateSchemaInfo &info) + : SchemaCatalogEntry(catalog, info), tables(*this) { +} + +ICSchemaEntry::~ICSchemaEntry() { +} + +ICTransaction &GetUCTransaction(CatalogTransaction transaction) { + if (!transaction.transaction) { + throw InternalException("No transaction!?"); + } + return transaction.transaction->Cast(); +} + +optional_ptr ICSchemaEntry::CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) { + auto &base_info = info.Base(); + auto table_name = base_info.table; + if (base_info.on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT) { + throw NotImplementedException("REPLACE ON CONFLICT in CreateTable"); + } + return tables.CreateTable(transaction.GetContext(), info); +} + +void ICSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) { + if (info.type != CatalogType::TABLE_ENTRY) { + throw BinderException("Expecting table entry"); + } + tables.DropTable(context, info); + GetCatalogSet(info.type).DropEntry(context, info); +} + +optional_ptr ICSchemaEntry::CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) { + throw BinderException("PC databases do not support creating functions"); +} + +void ICUnqualifyColumnRef(ParsedExpression &expr) { + if (expr.type == ExpressionType::COLUMN_REF) { + auto &colref = expr.Cast(); + auto name = std::move(colref.column_names.back()); + colref.column_names = {std::move(name)}; + return; + } + ParsedExpressionIterator::EnumerateChildren(expr, ICUnqualifyColumnRef); +} + +optional_ptr ICSchemaEntry::CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) { + throw NotImplementedException("CreateIndex"); +} + +string GetUCCreateView(CreateViewInfo &info) { + throw NotImplementedException("GetCreateView"); +} + +optional_ptr ICSchemaEntry::CreateView(CatalogTransaction transaction, CreateViewInfo &info) { + if (info.sql.empty()) { + throw BinderException("Cannot create view that originated from an " + "empty SQL statement"); + } + if (info.on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT || + info.on_conflict == OnCreateConflict::IGNORE_ON_CONFLICT) { + auto current_entry = GetEntry(transaction, CatalogType::VIEW_ENTRY, info.view_name); + if (current_entry) { + if (info.on_conflict == OnCreateConflict::IGNORE_ON_CONFLICT) { + return current_entry; + } + throw NotImplementedException("REPLACE ON CONFLICT in CreateView"); + } + } + auto &ic_transaction = GetUCTransaction(transaction); + // ic_transaction.Query(GetUCCreateView(info)); + return tables.RefreshTable(transaction.GetContext(), info.view_name); +} + +optional_ptr ICSchemaEntry::CreateType(CatalogTransaction transaction, CreateTypeInfo &info) { + throw BinderException("PC databases do not support creating types"); +} + +optional_ptr ICSchemaEntry::CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) { + throw BinderException("PC databases do not support creating sequences"); +} + +optional_ptr ICSchemaEntry::CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) { + throw BinderException("PC databases do not support creating table functions"); +} + +optional_ptr ICSchemaEntry::CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) { + throw BinderException("PC databases do not support creating copy functions"); +} + +optional_ptr ICSchemaEntry::CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) { + throw BinderException("PC databases do not support creating pragma functions"); +} + +optional_ptr ICSchemaEntry::CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) { + throw BinderException("PC databases do not support creating collations"); +} + +void ICSchemaEntry::Alter(CatalogTransaction transaction, AlterInfo &info) { + if (info.type != AlterType::ALTER_TABLE) { + throw BinderException("Only altering tables is supported for now"); + } + auto &alter = info.Cast(); + tables.AlterTable(transaction.GetContext(), alter); +} + +bool CatalogTypeIsSupported(CatalogType type) { + switch (type) { + case CatalogType::INDEX_ENTRY: + case CatalogType::TABLE_ENTRY: + case CatalogType::VIEW_ENTRY: + return true; + default: + return false; + } +} + +void ICSchemaEntry::Scan(ClientContext &context, CatalogType type, + const std::function &callback) { + if (!CatalogTypeIsSupported(type)) { + return; + } + GetCatalogSet(type).Scan(context, callback); +} +void ICSchemaEntry::Scan(CatalogType type, const std::function &callback) { + throw NotImplementedException("Scan without context not supported"); +} + +optional_ptr ICSchemaEntry::GetEntry(CatalogTransaction transaction, CatalogType type, + const string &name) { + if (!CatalogTypeIsSupported(type)) { + return nullptr; + } + return GetCatalogSet(type).GetEntry(transaction.GetContext(), name); +} + +ICCatalogSet &ICSchemaEntry::GetCatalogSet(CatalogType type) { + switch (type) { + case CatalogType::TABLE_ENTRY: + case CatalogType::VIEW_ENTRY: + return tables; + default: + throw InternalException("Type not supported for GetCatalogSet"); + } +} + +} // namespace duckdb diff --git a/src/storage/irc_schema_set.cpp b/src/storage/irc_schema_set.cpp new file mode 100644 index 0000000..81fdc04 --- /dev/null +++ b/src/storage/irc_schema_set.cpp @@ -0,0 +1,49 @@ +#include "catalog_api.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/catalog/catalog.hpp" +#include "storage/irc_catalog.hpp" +#include "storage/irc_schema_set.hpp" +#include "storage/irc_transaction.hpp" + +namespace duckdb { + +ICSchemaSet::ICSchemaSet(Catalog &catalog) : ICCatalogSet(catalog) { +} + +void ICSchemaSet::LoadEntries(ClientContext &context) { + if (!entries.empty()) { + return; + } + + auto &ic_catalog = catalog.Cast(); + auto schemas = IRCAPI::GetSchemas(catalog.GetName(), ic_catalog.internal_name, ic_catalog.credentials); + for (const auto &schema : schemas) { + CreateSchemaInfo info; + info.schema = schema.schema_name; + info.internal = false; + auto schema_entry = make_uniq(catalog, info); + schema_entry->schema_data = make_uniq(schema); + CreateEntry(std::move(schema_entry)); + } +} + +void ICSchemaSet::FillEntry(ClientContext &context, unique_ptr &entry) { + // Nothing to do +} + +optional_ptr ICSchemaSet::CreateSchema(ClientContext &context, CreateSchemaInfo &info) { + auto &ic_catalog = catalog.Cast(); + auto schema = IRCAPI::CreateSchema(catalog.GetName(), ic_catalog.internal_name, info.schema, ic_catalog.credentials); + auto schema_entry = make_uniq(catalog, info); + schema_entry->schema_data = make_uniq(schema); + return CreateEntry(std::move(schema_entry)); +} + +void ICSchemaSet::DropSchema(ClientContext &context, DropInfo &info) { + auto &ic_catalog = catalog.Cast(); + IRCAPI::DropSchema(ic_catalog.internal_name, info.name, ic_catalog.credentials); + DropEntry(context, info); +} + +} // namespace duckdb diff --git a/src/storage/irc_table_entry.cpp b/src/storage/irc_table_entry.cpp new file mode 100644 index 0000000..189f156 --- /dev/null +++ b/src/storage/irc_table_entry.cpp @@ -0,0 +1,129 @@ +#include "storage/irc_catalog.hpp" +#include "storage/irc_schema_entry.hpp" +#include "storage/irc_table_entry.hpp" +#include "storage/irc_transaction.hpp" +#include "duckdb/storage/statistics/base_statistics.hpp" +#include "duckdb/storage/table_storage_info.hpp" +#include "duckdb/main/extension_util.hpp" +#include "duckdb/main/database.hpp" +#include "duckdb/main/secret/secret_manager.hpp" +#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" +#include "duckdb/parser/tableref/table_function_ref.hpp" +#include "catalog_api.hpp" +#include "../../duckdb/third_party/catch/catch.hpp" +#include "duckdb/planner/binder.hpp" +#include "duckdb/planner/tableref/bound_table_function.hpp" +#include "duckdb/planner/logical_operator.hpp" +#include "duckdb/planner/operator/logical_get.hpp" + + +namespace duckdb { + +ICTableEntry::ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info) + : TableCatalogEntry(catalog, schema, info) { + this->internal = false; +} + +ICTableEntry::ICTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, ICTableInfo &info) + : TableCatalogEntry(catalog, schema, *info.create_info) { + this->internal = false; +} + +unique_ptr ICTableEntry::GetStatistics(ClientContext &context, column_t column_id) { + return nullptr; +} + +void ICTableEntry::BindUpdateConstraints(Binder &binder, LogicalGet &, LogicalProjection &, LogicalUpdate &, + ClientContext &) { + throw NotImplementedException("BindUpdateConstraints"); +} + +TableFunction ICTableEntry::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { + auto &db = DatabaseInstance::GetDatabase(context); + auto &ic_catalog = catalog.Cast(); + + auto &parquet_function_set = ExtensionUtil::GetTableFunction(db, "parquet_scan"); + auto parquet_scan_function = parquet_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); + + auto &iceberg_function_set = ExtensionUtil::GetTableFunction(db, "iceberg_scan"); + auto iceberg_scan_function = iceberg_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); + + D_ASSERT(table_data); + + if (table_data->data_source_format != "ICEBERG") { + throw NotImplementedException("Table '%s' is of unsupported format '%s', ", table_data->name, + table_data->data_source_format); + } + + auto &secret_manager = SecretManager::Get(context); + // Get Credentials from IRC API + auto table_credentials = IRCAPI::GetTableCredentials( + ic_catalog.internal_name, table_data->schema_name, table_data->name, ic_catalog.credentials); + // First check if table credentials are set (possible the IC catalog does not return credentials) + if (!table_credentials.key_id.empty()) { + // Inject secret into secret manager scoped to this path + CreateSecretInfo info(OnCreateConflict::ERROR_ON_CONFLICT, SecretPersistType::TEMPORARY); + info.name = "__internal_ic_" + table_data->table_id; + info.type = "s3"; + info.provider = "config"; + info.storage_type = "memory"; + info.options = { + {"key_id", table_credentials.key_id}, + {"secret", table_credentials.secret}, + {"session_token", table_credentials.session_token}, + {"region", ic_catalog.credentials.aws_region}, + }; + + std::string lc_storage_location; + lc_storage_location.resize(table_data->storage_location.size()); + std::transform(table_data->storage_location.begin(), table_data->storage_location.end(), lc_storage_location.begin(), ::tolower); + size_t metadata_pos = lc_storage_location.find("metadata"); + if (metadata_pos != std::string::npos) { + info.scope = {lc_storage_location.substr(0, metadata_pos)}; + } else { + throw std::runtime_error("Substring not found"); + } + auto my_secret = secret_manager.CreateSecret(context, info); + } + + named_parameter_map_t param_map; + vector return_types; + vector names; + TableFunctionRef empty_ref; + + // Set the S3 path as input to table function + vector inputs = {table_data->storage_location}; + + TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, + iceberg_scan_function, empty_ref); + + auto table_ref = iceberg_scan_function.bind_replace(context, bind_input); + + // 1) Create a Binder and bind the parser-level TableRef -> BoundTableRef + auto binder = Binder::CreateBinder(context); + auto bound_ref = binder->Bind(*table_ref); + + // 2) Create a logical plan from the bound reference + unique_ptr logical_plan = binder->CreatePlan(*bound_ref); + + // 3) Recursively search the logical plan for a LogicalGet node + // For a single table function, you often have just one operator: LogicalGet + LogicalOperator *op = logical_plan.get(); + if (op->type != LogicalOperatorType::LOGICAL_GET) { + throw std::runtime_error("Expected a LogicalGet, but got something else!"); + } + + // 4) Access the bind_data inside LogicalGet + auto &get = (LogicalGet &)*op; + bind_data = std::move(get.bind_data); + + return parquet_scan_function; +} + +TableStorageInfo ICTableEntry::GetStorageInfo(ClientContext &context) { + TableStorageInfo result; + // TODO fill info + return result; +} + +} // namespace duckdb diff --git a/src/storage/irc_table_set.cpp b/src/storage/irc_table_set.cpp new file mode 100644 index 0000000..eb84ef7 --- /dev/null +++ b/src/storage/irc_table_set.cpp @@ -0,0 +1,114 @@ +#include "catalog_api.hpp" +#include "catalog_utils.hpp" + +#include "storage/irc_catalog.hpp" +#include "storage/irc_table_set.hpp" +#include "storage/irc_transaction.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" +#include "duckdb/parser/constraints/not_null_constraint.hpp" +#include "duckdb/parser/constraints/unique_constraint.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/catalog/dependency_list.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" +#include "duckdb/parser/constraints/list.hpp" +#include "storage/irc_schema_entry.hpp" +#include "duckdb/parser/parser.hpp" + +namespace duckdb { + +ICTableSet::ICTableSet(ICSchemaEntry &schema) : ICInSchemaSet(schema) { +} + +static ColumnDefinition CreateColumnDefinition(ClientContext &context, IRCAPIColumnDefinition &coldef) { + return {coldef.name, ICUtils::TypeToLogicalType(context, coldef.type_text)}; +} + +unique_ptr ICTableSet::_CreateCatalogEntry(ClientContext &context, IRCAPITable table) { + D_ASSERT(schema.name == table.schema_name); + CreateTableInfo info; + info.table = table.name; + + for (auto &col : table.columns) { + info.columns.AddColumn(CreateColumnDefinition(context, col)); + } + + auto table_entry = make_uniq(catalog, schema, info); + table_entry->table_data = make_uniq(table); + return table_entry; +} + +void ICTableSet::FillEntry(ClientContext &context, unique_ptr &entry) { + auto* derived = static_cast(entry.get()); + if (!derived->table_data->storage_location.empty()) { + return; + } + + auto &ic_catalog = catalog.Cast(); + auto table = IRCAPI::GetTable(catalog.GetName(), catalog.GetDBPath(), schema.name, entry->name, ic_catalog.credentials); + entry = _CreateCatalogEntry(context, table); +} + +void ICTableSet::LoadEntries(ClientContext &context) { + if (!entries.empty()) { + return; + } + + auto &ic_catalog = catalog.Cast(); + // TODO: handle out-of-order columns using position property + auto tables = IRCAPI::GetTables(catalog.GetName(), catalog.GetDBPath(), schema.name, ic_catalog.credentials); + + for (auto &table : tables) { + auto entry = _CreateCatalogEntry(context, table); + CreateEntry(std::move(entry)); + } +} + +optional_ptr ICTableSet::RefreshTable(ClientContext &context, const string &table_name) { + auto table_info = GetTableInfo(context, schema, table_name); + auto table_entry = make_uniq(catalog, schema, *table_info); + auto table_ptr = table_entry.get(); + CreateEntry(std::move(table_entry)); + return table_ptr; +} + +unique_ptr ICTableSet::GetTableInfo(ClientContext &context, ICSchemaEntry &schema, + const string &table_name) { + throw NotImplementedException("ICTableSet::GetTableInfo"); +} + +optional_ptr ICTableSet::CreateTable(ClientContext &context, BoundCreateTableInfo &info) { + auto &ic_catalog = catalog.Cast(); + auto *table_info = dynamic_cast(info.base.get()); + auto table = IRCAPI::CreateTable(catalog.GetName(), ic_catalog.internal_name, schema.name, ic_catalog.credentials, table_info); + auto entry = _CreateCatalogEntry(context, table); + return CreateEntry(std::move(entry)); +} + +void ICTableSet::DropTable(ClientContext &context, DropInfo &info) { + auto &ic_catalog = catalog.Cast(); + IRCAPI::DropTable(catalog.GetName(), ic_catalog.internal_name, schema.name, info.name, ic_catalog.credentials); +} + +void ICTableSet::AlterTable(ClientContext &context, RenameTableInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, RenameColumnInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, AddColumnInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, RemoveColumnInfo &info) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +void ICTableSet::AlterTable(ClientContext &context, AlterTableInfo &alter) { + throw NotImplementedException("ICTableSet::AlterTable"); +} + +} // namespace duckdb diff --git a/src/storage/irc_transaction.cpp b/src/storage/irc_transaction.cpp new file mode 100644 index 0000000..5a55318 --- /dev/null +++ b/src/storage/irc_transaction.cpp @@ -0,0 +1,36 @@ +#include "storage/irc_transaction.hpp" +#include "storage/irc_catalog.hpp" +#include "duckdb/parser/parsed_data/create_view_info.hpp" +#include "duckdb/catalog/catalog_entry/index_catalog_entry.hpp" +#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" + +namespace duckdb { + +ICTransaction::ICTransaction(IRCatalog &ic_catalog, TransactionManager &manager, ClientContext &context) + : Transaction(manager, context), access_mode(ic_catalog.access_mode) { + // connection = ICConnection::Open(ic_catalog.path); +} + +ICTransaction::~ICTransaction() = default; + +void ICTransaction::Start() { + transaction_state = ICTransactionState::TRANSACTION_NOT_YET_STARTED; +} +void ICTransaction::Commit() { + if (transaction_state == ICTransactionState::TRANSACTION_STARTED) { + transaction_state = ICTransactionState::TRANSACTION_FINISHED; + // connection.Execute("COMMIT"); + } +} +void ICTransaction::Rollback() { + if (transaction_state == ICTransactionState::TRANSACTION_STARTED) { + transaction_state = ICTransactionState::TRANSACTION_FINISHED; + // connection.Execute("ROLLBACK"); + } +} + +ICTransaction &ICTransaction::Get(ClientContext &context, Catalog &catalog) { + return Transaction::Get(context, catalog).Cast(); +} + +} // namespace duckdb diff --git a/src/storage/irc_transaction_manager.cpp b/src/storage/irc_transaction_manager.cpp new file mode 100644 index 0000000..52bc5f7 --- /dev/null +++ b/src/storage/irc_transaction_manager.cpp @@ -0,0 +1,40 @@ +#include "storage/irc_transaction_manager.hpp" +#include "duckdb/main/attached_database.hpp" + +namespace duckdb { + +ICTransactionManager::ICTransactionManager(AttachedDatabase &db_p, IRCatalog &ic_catalog) + : TransactionManager(db_p), ic_catalog(ic_catalog) { +} + +Transaction &ICTransactionManager::StartTransaction(ClientContext &context) { + auto transaction = make_uniq(ic_catalog, *this, context); + transaction->Start(); + auto &result = *transaction; + lock_guard l(transaction_lock); + transactions[result] = std::move(transaction); + return result; +} + +ErrorData ICTransactionManager::CommitTransaction(ClientContext &context, Transaction &transaction) { + auto &ic_transaction = transaction.Cast(); + ic_transaction.Commit(); + lock_guard l(transaction_lock); + transactions.erase(transaction); + return ErrorData(); +} + +void ICTransactionManager::RollbackTransaction(Transaction &transaction) { + auto &ic_transaction = transaction.Cast(); + ic_transaction.Rollback(); + lock_guard l(transaction_lock); + transactions.erase(transaction); +} + +void ICTransactionManager::Checkpoint(ClientContext &context, bool force) { + auto &transaction = ICTransaction::Get(context, db.GetCatalog()); + // auto &db = transaction.GetConnection(); + // db.Execute("CHECKPOINT"); +} + +} // namespace duckdb diff --git a/test/sql/iceberg_catalog_read.test b/test/sql/iceberg_catalog_read.test new file mode 100644 index 0000000..33deddd --- /dev/null +++ b/test/sql/iceberg_catalog_read.test @@ -0,0 +1,91 @@ +# name: test/sql/iceberg_catalog_read.test +# description: test integration with iceberg catalog read +# group: [iceberg] + +require-env ICEBERG_SERVER_AVAILABLE + +require iceberg + +require parquet + +require httpfs + +statement ok +CREATE SECRET ( + TYPE ICEBERG, + ENDPOINT 'http://127.0.0.1:8181' + ); + + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 + ); + +statement ok +ATTACH '' AS my_datalake (TYPE ICEBERG); + +query IIIIII +Show all tables; +---- +my_datalake default table_mor_deletes [__] [INTEGER] false +my_datalake default table_partitioned [__] [INTEGER] false +my_datalake default table_unpartitioned [__] [INTEGER] false + +statement error +select * from table_unpartitioned +---- +:.*table_unpartitioned does not exist.* + +statement error +select * from table_unpartitioned +---- +:.*Did you mean.*my_datalake.default.table_unpartitioned.* + +query III +select * from my_datalake.default.table_unpartitioned order by all; +---- +2023-03-01 1 a +2023-03-02 2 b +2023-03-03 3 c +2023-03-04 4 d +2023-03-05 5 e +2023-03-06 6 f +2023-03-07 7 g +2023-03-08 8 h +2023-03-09 9 i +2023-03-10 10 j +2023-03-11 11 k +2023-03-12 12 l + +# test deletes (see provision.py for where deletes occur) +query III +select * from my_datalake.default.table_mor_deletes order by all; +---- +2023-03-01 1 a +2023-03-02 2 b +2023-03-03 3 c +2023-03-10 10 j +2023-03-11 11 k +2023-03-12 12 l + +statement error +update my_datalake.default.table_unpartitioned set number = 5 where number < 5; +---- +:.*Not implemented Error.* + +statement error +delete from my_datalake.default.table_unpartitioned where number < 5; +---- +:.*Not implemented Error.* + +statement error +insert into my_datalake.default.table_unpartitioned values ('2023-03-13', 13, 'm'); +---- +:.*Not implemented Error.* + diff --git a/test/sql/iceberg_scan_generated_data_1.test_slow b/test/sql/iceberg_scan_generated_data_1.test_slow index a5856fd..b5b5a10 100644 --- a/test/sql/iceberg_scan_generated_data_1.test_slow +++ b/test/sql/iceberg_scan_generated_data_1.test_slow @@ -10,32 +10,32 @@ require-env DUCKDB_ICEBERG_HAVE_TEST_DATA # Check count matches query I -SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_1/pyspark_iceberg_table'); +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec1_0_001/pyspark_iceberg_table'); ---- -:data/iceberg/generated_1/expected_results/last/count.csv +:data/iceberg/generated_spec1_0_001/expected_results/last/count.csv # Check data is identical, sorting by uuid to guarantee unique order. query I nosort q1 -SELECT COUNT(*) FROM ICEBERG_SCAN('data/iceberg/generated_1/pyspark_iceberg_table'); +SELECT COUNT(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec1_0_001/pyspark_iceberg_table'); ---- query I nosort q1 -SELECT COUNT(*) FROM PARQUET_SCAN('data/iceberg/generated_1/expected_results/last/data/*.parquet'); +SELECT COUNT(*) FROM PARQUET_SCAN('data/iceberg/generated_spec1_0_001/expected_results/last/data/*.parquet'); ---- query I nosort q2 -SELECT COUNT(*), MIN(l_suppkey_long), MAX(l_suppkey_long), SUM(l_suppkey_long) FROM ICEBERG_SCAN('data/iceberg/generated_1/pyspark_iceberg_table'); +SELECT COUNT(*), MIN(l_suppkey_long), MAX(l_suppkey_long), SUM(l_suppkey_long) FROM ICEBERG_SCAN('data/iceberg/generated_spec1_0_001/pyspark_iceberg_table'); ---- query I nosort q2 -SELECT COUNT(*), MIN(l_suppkey_long), MAX(l_suppkey_long), SUM(l_suppkey_long) FROM PARQUET_SCAN('data/iceberg/generated_1/expected_results/last/data/*.parquet'); +SELECT COUNT(*), MIN(l_suppkey_long), MAX(l_suppkey_long), SUM(l_suppkey_long) FROM PARQUET_SCAN('data/iceberg/generated_spec1_0_001/expected_results/last/data/*.parquet'); ---- # Full table compare: very slow query I nosort q3 -SELECT * FROM ICEBERG_SCAN('data/iceberg/generated_1/pyspark_iceberg_table') WHERE uuid NOT NULL ORDER BY uuid; +SELECT * FROM ICEBERG_SCAN('data/iceberg/generated_spec1_0_001/pyspark_iceberg_table') WHERE uuid NOT NULL ORDER BY uuid; ---- query I nosort q3 -SELECT * FROM PARQUET_SCAN('data/iceberg/generated_1/expected_results/last/data/*.parquet') WHERE uuid NOT NULL ORDER BY uuid; +SELECT * FROM PARQUET_SCAN('data/iceberg/generated_spec1_0_001/expected_results/last/data/*.parquet') WHERE uuid NOT NULL ORDER BY uuid; ---- diff --git a/vcpkg.json b/vcpkg.json index 94ab7f7..17661df 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -4,6 +4,7 @@ "name": "avro-cpp", "features": ["snappy"] }, + "curl", "openssl" ], "vcpkg-configuration": {