diff --git a/.gitignore b/.gitignore index eccf73f..cb608ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build +.vscode .idea cmake-build-debug duckdb_unittest_tempdir/ @@ -10,4 +11,4 @@ test/sql/tmp.test data/iceberg/generated_* scripts/metastore_db/ scripts/derby.log -scripts/test-script-with-path.sql \ No newline at end of file +scripts/test-script-with-path.sql diff --git a/data/iceberg/lineitem_iceberg_gz/data/.00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet.crc b/data/iceberg/lineitem_iceberg_gz/data/.00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet.crc new file mode 100644 index 0000000..f88d18d Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/data/.00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet.crc differ diff --git a/data/iceberg/lineitem_iceberg_gz/data/00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet b/data/iceberg/lineitem_iceberg_gz/data/00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet new file mode 100644 index 0000000..63b23d3 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/data/00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/.23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro.crc b/data/iceberg/lineitem_iceberg_gz/metadata/.23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro.crc new file mode 100644 index 0000000..c2fab3e Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/.23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro.crc differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/.snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro.crc b/data/iceberg/lineitem_iceberg_gz/metadata/.snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro.crc new file mode 100644 index 0000000..a27fd90 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/.snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro.crc differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/.v1.gz.metadata.json.crc b/data/iceberg/lineitem_iceberg_gz/metadata/.v1.gz.metadata.json.crc new file mode 100644 index 0000000..48de20c Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/.v1.gz.metadata.json.crc differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/.v2.gz.metadata.json.crc b/data/iceberg/lineitem_iceberg_gz/metadata/.v2.gz.metadata.json.crc new file mode 100644 index 0000000..915a750 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/.v2.gz.metadata.json.crc differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/.version-hint.text.crc b/data/iceberg/lineitem_iceberg_gz/metadata/.version-hint.text.crc new file mode 100644 index 0000000..2003120 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/.version-hint.text.crc differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro b/data/iceberg/lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro new file mode 100644 index 0000000..8425c93 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro b/data/iceberg/lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro new file mode 100644 index 0000000..9e26d01 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/v1.gz.metadata.json b/data/iceberg/lineitem_iceberg_gz/metadata/v1.gz.metadata.json new file mode 100644 index 0000000..efc8150 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/v1.gz.metadata.json differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/v2.gz.metadata.json b/data/iceberg/lineitem_iceberg_gz/metadata/v2.gz.metadata.json new file mode 100644 index 0000000..a48a7c9 Binary files /dev/null and b/data/iceberg/lineitem_iceberg_gz/metadata/v2.gz.metadata.json differ diff --git a/data/iceberg/lineitem_iceberg_gz/metadata/version-hint.text b/data/iceberg/lineitem_iceberg_gz/metadata/version-hint.text new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/iceberg/lineitem_iceberg_gz/metadata/version-hint.text @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 3c15105..ba2f284 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -13,7 +13,7 @@ namespace duckdb { IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs, - bool allow_moved_paths) { + bool allow_moved_paths, string metadata_compression_codec) { IcebergTable ret; ret.path = iceberg_path; ret.snapshot = snapshot; @@ -118,8 +118,8 @@ unique_ptr IcebergSnapshot::GetParseInfo(yyjson_doc &metadata return make_uniq(std::move(info)); } -unique_ptr IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs) { - auto metadata_json = ReadMetaData(path, fs); +unique_ptr IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec) { + auto metadata_json = ReadMetaData(path, fs, metadata_compression_codec); auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0); auto parse_info = GetParseInfo(*doc); @@ -130,61 +130,76 @@ unique_ptr IcebergSnapshot::GetParseInfo(const string &path, return std::move(parse_info); } -IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs) { - auto info = GetParseInfo(path, fs); +IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, + string metadata_compression_codec, bool skip_schema_inference) { + auto info = GetParseInfo(path, fs, metadata_compression_codec); auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots); if (!latest_snapshot) { throw IOException("No snapshots found"); } - return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas); + return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); } -IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id) { - auto info = GetParseInfo(path, fs); +IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, + string metadata_compression_codec, bool skip_schema_inference) { + auto info = GetParseInfo(path, fs, metadata_compression_codec); auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id); if (!snapshot) { throw IOException("Could not find snapshot with id " + to_string(snapshot_id)); } - return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas); + return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, + metadata_compression_codec, skip_schema_inference); } -IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp) { - auto info = GetParseInfo(path, fs); +IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, + bool skip_schema_inference) { + auto info = GetParseInfo(path, fs, metadata_compression_codec); auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp); if (!snapshot) { throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp)); } - return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas); + return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); } -string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs) { - string metadata_file_path; +// Function to generate a metadata file url +string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) { + if (metadata_compression_codec != "gzip") { + return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json"); + } + return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json"); +} +string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) { + string metadata_file_path; if (StringUtil::EndsWith(path, ".json")) { metadata_file_path = path; } else { auto table_version = GetTableVersion(path, fs); auto meta_path = fs.JoinPath(path, "metadata"); - metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".metadata.json"); + metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec); } + if (metadata_compression_codec == "gzip") { + return IcebergUtils::GzFileToString(metadata_file_path, fs); + } return IcebergUtils::FileToString(metadata_file_path, fs); } IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, - vector &schemas) { + vector &schemas, string metadata_compression_codec, + bool skip_schema_inference) { IcebergSnapshot ret; auto snapshot_tag = yyjson_get_tag(snapshot); if (snapshot_tag != YYJSON_TYPE_OBJ) { throw IOException("Invalid snapshot field found parsing iceberg metadata.json"); } - + ret.metadata_compression_codec = metadata_compression_codec; if (iceberg_format_version == 1) { ret.sequence_number = 0; } else if (iceberg_format_version == 2) { @@ -196,8 +211,9 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list"); ret.iceberg_format_version = iceberg_format_version; ret.schema_id = schema_id; - ret.schema = ParseSchema(schemas, ret.schema_id); - + if (!skip_schema_inference) { + ret.schema = ParseSchema(schemas, ret.schema_id); + } return ret; } diff --git a/src/common/utils.cpp b/src/common/utils.cpp index c0272bf..230a3c7 100644 --- a/src/common/utils.cpp +++ b/src/common/utils.cpp @@ -1,5 +1,8 @@ #include "duckdb.hpp" #include "iceberg_utils.hpp" +#include "zlib.h" +#include "fstream" +#include "duckdb/common/gzip_file_system.hpp" namespace duckdb { @@ -12,6 +15,13 @@ string IcebergUtils::FileToString(const string &path, FileSystem &fs) { return ret_val; } +// Function to decompress a gz file content string +string IcebergUtils::GzFileToString(const string &path, FileSystem &fs) { + // Initialize zlib variables + string gzipped_string = FileToString(path, fs); + return GZipFileSystem::UncompressGZIPString(gzipped_string); +} + string IcebergUtils::GetFullPath(const string &iceberg_path, const string &relative_file_path, FileSystem &fs) { std::size_t found = relative_file_path.find("/metadata/"); if (found != string::npos) { diff --git a/src/iceberg_functions/iceberg_metadata.cpp b/src/iceberg_functions/iceberg_metadata.cpp index 5c68d41..2b5f857 100644 --- a/src/iceberg_functions/iceberg_metadata.cpp +++ b/src/iceberg_functions/iceberg_metadata.cpp @@ -55,30 +55,35 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl auto iceberg_path = input.inputs[0].ToString(); bool allow_moved_paths = false; - + string metadata_compression_codec = "none"; + bool skip_schema_inference = false; + for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); if (loption == "allow_moved_paths") { allow_moved_paths = BooleanValue::Get(kv.second); + } else if (loption == "metadata_compression_codec") { + metadata_compression_codec = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); } } - IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue()); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue()); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference); } ret->iceberg_table = - make_uniq(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths)); + make_uniq(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec)); auto manifest_types = IcebergManifest::Types(); return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end()); @@ -138,17 +143,23 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; + fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; + fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; + fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); return function_set; diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index b63622f..fdb4934 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -129,16 +129,18 @@ static Value GetParquetSchemaParam(vector &schema) { //! Build the Parquet Scan expression for the files we need to scan static unique_ptr MakeScanExpression(vector &data_file_values, vector &delete_file_values, - vector &schema, bool allow_moved_paths) { + vector &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) { // No deletes, just return a TableFunctionRef for a parquet scan of the data files if (delete_file_values.empty()) { auto table_function_ref_data = make_uniq(); table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); + if (!skip_schema_inference) { + left_children.push_back( + make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), + make_uniq(GetParquetSchemaParam(schema)))); + } table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); return std::move(table_function_ref_data); } @@ -169,10 +171,11 @@ static unique_ptr MakeScanExpression(vector &data_file_values, left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("file_row_number"), make_uniq(Value(1)))); - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); - + if (!skip_schema_inference) { + left_children.push_back( + make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), + make_uniq(GetParquetSchemaParam(schema)))); + } table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); join_node->left = std::move(table_function_ref_data); @@ -208,7 +211,9 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table // this allows hive tables to be moved and have mismatching paths, usefull for testing, but will have worse // performance bool allow_moved_paths = false; + bool skip_schema_inference = false; string mode = "default"; + string metadata_compression_codec = "none"; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); @@ -220,24 +225,27 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table } } else if (loption == "mode") { mode = StringValue::Get(kv.second); + } else if (loption == "metadata_compression_codec") { + metadata_compression_codec = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); } } - IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue()); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue()); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference); } - IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths); + IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec); auto data_files = iceberg_table.GetPaths(); auto delete_files = iceberg_table.GetPaths(); vector data_file_values; @@ -254,7 +262,7 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table if (mode == "list_files") { return MakeListFilesExpression(data_file_values, delete_file_values); } else if (mode == "default") { - return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths); + return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference); } else { throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'"); } @@ -265,22 +273,28 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { auto fun = TableFunction({LogicalType::VARCHAR}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); fun.bind_replace = IcebergScanBindReplace; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; + fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); fun.bind_replace = IcebergScanBindReplace; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; + fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); fun.bind_replace = IcebergScanBindReplace; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; + fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); return function_set; diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index 6d6ef57..ec76b42 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -12,6 +12,8 @@ namespace duckdb { struct IcebergSnaphotsBindData : public TableFunctionData { IcebergSnaphotsBindData() {}; string filename; + string metadata_compression_codec; + bool skip_schema_inference = false; }; struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState { @@ -22,11 +24,12 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState } } static unique_ptr Init(ClientContext &context, TableFunctionInitInput &input) { + auto bind_data = input.bind_data->Cast(); auto global_state = make_uniq(); - + FileSystem &fs = FileSystem::GetFileSystem(context); - global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs); + global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec); global_state->metadata_doc = yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0); auto root = yyjson_doc_get_root(global_state->metadata_doc); @@ -45,8 +48,21 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState static unique_ptr IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, vector &names) { auto bind_data = make_uniq(); - + + string metadata_compression_codec = "none"; + bool skip_schema_inference = false; + + for (auto &kv : input.named_parameters) { + auto loption = StringUtil::Lower(kv.first); + if (loption == "metadata_compression_codec") { + metadata_compression_codec = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); + } + } bind_data->filename = input.inputs[0].ToString(); + bind_data->metadata_compression_codec = metadata_compression_codec; + bind_data->skip_schema_inference = skip_schema_inference; names.emplace_back("sequence_number"); return_types.emplace_back(LogicalType::UBIGINT); @@ -63,19 +79,25 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab return std::move(bind_data); } +static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, + vector &return_types, vector &names) { + +} // Snapshots function static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { auto &global_state = data.global_state->Cast(); - + auto &bind_data = data.bind_data->Cast(); idx_t i = 0; while (auto next_snapshot = yyjson_arr_iter_next(&global_state.snapshot_it)) { if (i >= STANDARD_VECTOR_SIZE) { break; } + auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc); auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version, - parse_info->schema_id, parse_info->schemas); + parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec, + bind_data.skip_schema_inference); FlatVector::GetData(output.data[0])[i] = snapshot.sequence_number; FlatVector::GetData(output.data[1])[i] = snapshot.snapshot_id; @@ -92,6 +114,8 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() { TableFunctionSet function_set("iceberg_snapshots"); TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind, IcebergSnapshotGlobalTableFunctionState::Init); + table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + table_function.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; function_set.AddFunction(table_function); return std::move(function_set); } diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index 496e76c..9dd142d 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -57,15 +57,16 @@ class IcebergSnapshot { idx_t iceberg_format_version; uint64_t schema_id; vector schema; + string metadata_compression_codec = "none"; - static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs); - static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id); - static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp); + static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp, bool skip_schema_inference); + static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp, bool skip_schema_inference); + static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp, bool skip_schema_inference); static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, - vector &schemas); - static string ReadMetaData(const string &path, FileSystem &fs); - static yyjson_val *GetSnapshots(const string &path, FileSystem &fs); + vector &schemas, string metadata_compression_codec, bool skip_schema_inference); + static string ReadMetaData(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); + static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); static unique_ptr GetParseInfo(yyjson_doc &metadata_json); protected: @@ -75,7 +76,7 @@ class IcebergSnapshot { static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id); static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp); static vector ParseSchema(vector &schemas, idx_t schema_id); - static unique_ptr GetParseInfo(const string &path, FileSystem &fs); + static unique_ptr GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec); }; //! Represents the iceberg table at a specific IcebergSnapshot. Corresponds to a single Manifest List. @@ -83,7 +84,7 @@ struct IcebergTable { public: //! Loads all(!) metadata of into IcebergTable object static IcebergTable Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs, - bool allow_moved_paths = false); + bool allow_moved_paths = false, string metadata_compression_codec = "none"); //! Returns all paths to be scanned for the IcebergManifestContentType template diff --git a/src/include/iceberg_utils.hpp b/src/include/iceberg_utils.hpp index 3dbced7..52c67eb 100644 --- a/src/include/iceberg_utils.hpp +++ b/src/include/iceberg_utils.hpp @@ -18,7 +18,8 @@ class IcebergUtils { public: //! Downloads a file fully into a string static string FileToString(const string &path, FileSystem &fs); - + //! Downloads a gz file fully into a string + static string GzFileToString(const string &path, FileSystem &fs); //! Somewhat hacky function that allows relative paths in iceberg tables to be resolved, //! used for the allow_moved_paths debug option which allows us to test with iceberg tables that //! were moved without their paths updated diff --git a/test/sql/iceberg_metadata.test b/test/sql/iceberg_metadata.test index ca8044e..3238c1c 100644 --- a/test/sql/iceberg_metadata.test +++ b/test/sql/iceberg_metadata.test @@ -16,6 +16,16 @@ SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg', ALLOW_MOVED_PATH lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m1.avro 2 DATA ADDED EXISTING lineitem_iceberg/data/00041-414-f3c73457-bbd6-4b92-9c15-17b241171b16-00001.parquet PARQUET 51793 lineitem_iceberg/metadata/10eaca8a-1e1c-421e-ad6d-b232e5ee23d3-m0.avro 2 DATA DELETED EXISTING lineitem_iceberg/data/00000-411-0792dcfe-4e25-4ca3-8ada-175286069a47-00001.parquet PARQUET 60175 +statement error +SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE); +---- +IO Error: Cannot open file + +query IIIIIIII +SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip"); +---- +lineitem_iceberg_gz/metadata/23f9dbea-1e7f-4694-a82c-dc3c9a94953e-m0.avro 0 DATA ADDED EXISTING lineitem_iceberg_gz/data/00000-2-371a340c-ded5-4e85-aa49-9c788d6f21cd-00001.parquet PARQUET 111968 + statement error SELECT * FROM ICEBERG_METADATA('data/iceberg/lineitem_iceberg_nonexistent'); ---- diff --git a/test/sql/iceberg_scan.test b/test/sql/iceberg_scan.test index 841008c..e505d1c 100644 --- a/test/sql/iceberg_scan.test +++ b/test/sql/iceberg_scan.test @@ -58,4 +58,14 @@ SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', '2023-02-15 1 statement error FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg', '2023-02-15 15:07:54.503'::TIMESTAMP, ALLOW_MOVED_PATHS=TRUE); ---- -IO Error: Could not find latest snapshots for timestamp 2023-02-15 15:07:54.503 \ No newline at end of file +IO Error: Could not find latest snapshots for timestamp 2023-02-15 15:07:54.503 + +statement error +SELECT * FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE); +---- +IO Error: Cannot open file + +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/lineitem_iceberg_gz', ALLOW_MOVED_PATHS=TRUE, METADATA_COMPRESSION_CODEC="gzip"); +---- +111968 diff --git a/test/sql/iceberg_snapshots.test b/test/sql/iceberg_snapshots.test index 6b93f12..39d29d8 100644 --- a/test/sql/iceberg_snapshots.test +++ b/test/sql/iceberg_snapshots.test @@ -21,4 +21,14 @@ SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg'); statement error SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_nonexistent'); ---- -IO Error: Cannot open file "data/iceberg/lineitem_iceberg_nonexistent/metadata/version-hint.text": No such file or directory \ No newline at end of file +IO Error: Cannot open file "data/iceberg/lineitem_iceberg_nonexistent/metadata/version-hint.text": No such file or directory + +statement error +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz'); +---- +IO Error: Cannot open file + +query IIII +SELECT * FROM ICEBERG_SNAPSHOTS('data/iceberg/lineitem_iceberg_gz', METADATA_COMPRESSION_CODEC="gzip"); +---- +0 4468019210336628573 2024-03-13 18:38:58.602 lineitem_iceberg_gz/metadata/snap-4468019210336628573-1-23f9dbea-1e7f-4694-a82c-dc3c9a94953e.avro \ No newline at end of file