Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to skip schema inference - Archived #43

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
build
.vscode
.idea
cmake-build-debug
duckdb_unittest_tempdir/
Expand All @@ -10,4 +11,4 @@ test/sql/tmp.test
data/iceberg/generated_*
scripts/metastore_db/
scripts/derby.log
scripts/test-script-with-path.sql
scripts/test-script-with-path.sql
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2
54 changes: 35 additions & 19 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace duckdb {

IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs,
bool allow_moved_paths) {
bool allow_moved_paths, string metadata_compression_codec) {
IcebergTable ret;
ret.path = iceberg_path;
ret.snapshot = snapshot;
Expand Down Expand Up @@ -118,8 +118,8 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(yyjson_doc &metadata
return make_uniq<SnapshotParseInfo>(std::move(info));
}

unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs) {
auto metadata_json = ReadMetaData(path, fs);
unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path, FileSystem &fs, string metadata_compression_codec) {
auto metadata_json = ReadMetaData(path, fs, metadata_compression_codec);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto parse_info = GetParseInfo(*doc);

Expand All @@ -130,61 +130,76 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path,
return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs,
string metadata_compression_codec, bool skip_schema_inference) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id,
string metadata_compression_codec, bool skip_schema_inference) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas,
metadata_compression_codec, skip_schema_inference);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp) {
auto info = GetParseInfo(path, fs);
IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec,
bool skip_schema_inference) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference);
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs) {
string metadata_file_path;
// Function to generate a metadata file url
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) {
if (metadata_compression_codec != "gzip") {
return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
}
return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
string metadata_file_path;
if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
} else {
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec);
}

if (metadata_compression_codec == "gzip") {
return IcebergUtils::GzFileToString(metadata_file_path, fs);
}
return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas) {
vector<yyjson_val *> &schemas, string metadata_compression_codec,
bool skip_schema_inference) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
throw IOException("Invalid snapshot field found parsing iceberg metadata.json");
}

ret.metadata_compression_codec = metadata_compression_codec;
if (iceberg_format_version == 1) {
ret.sequence_number = 0;
} else if (iceberg_format_version == 2) {
Expand All @@ -196,8 +211,9 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list");
ret.iceberg_format_version = iceberg_format_version;
ret.schema_id = schema_id;
ret.schema = ParseSchema(schemas, ret.schema_id);

if (!skip_schema_inference) {
ret.schema = ParseSchema(schemas, ret.schema_id);
}
return ret;
}

Expand Down
10 changes: 10 additions & 0 deletions src/common/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "duckdb.hpp"
#include "iceberg_utils.hpp"
#include "zlib.h"
#include "fstream"
#include "duckdb/common/gzip_file_system.hpp"

namespace duckdb {

Expand All @@ -12,6 +15,13 @@ string IcebergUtils::FileToString(const string &path, FileSystem &fs) {
return ret_val;
}

// Function to decompress a gz file content string
string IcebergUtils::GzFileToString(const string &path, FileSystem &fs) {
// Initialize zlib variables
string gzipped_string = FileToString(path, fs);
return GZipFileSystem::UncompressGZIPString(gzipped_string);
}

string IcebergUtils::GetFullPath(const string &iceberg_path, const string &relative_file_path, FileSystem &fs) {
std::size_t found = relative_file_path.find("/metadata/");
if (found != string::npos) {
Expand Down
23 changes: 17 additions & 6 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,35 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
auto iceberg_path = input.inputs[0].ToString();

bool allow_moved_paths = false;

string metadata_compression_codec = "none";
bool skip_schema_inference = false;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "allow_moved_paths") {
allow_moved_paths = BooleanValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
}

ret->iceberg_table =
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths));
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec));

auto manifest_types = IcebergManifest::Types();
return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end());
Expand Down Expand Up @@ -138,17 +143,23 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {

auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
42 changes: 28 additions & 14 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,18 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {

//! Build the Parquet Scan expression for the files we need to scan
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths) {
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) {
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
if (delete_file_values.empty()) {
auto table_function_ref_data = make_uniq<TableFunctionRef>();
table_function_ref_data->alias = "iceberg_scan_data";
vector<unique_ptr<ParsedExpression>> left_children;
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
if (!skip_schema_inference) {
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
}
table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
return std::move(table_function_ref_data);
}
Expand Down Expand Up @@ -169,10 +171,11 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
left_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL,
make_uniq<ColumnRefExpression>("file_row_number"),
make_uniq<ConstantExpression>(Value(1))));
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));

if (!skip_schema_inference) {
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
}
table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
join_node->left = std::move(table_function_ref_data);

Expand Down Expand Up @@ -208,7 +211,9 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
// this allows hive tables to be moved and have mismatching paths, usefull for testing, but will have worse
// performance
bool allow_moved_paths = false;
bool skip_schema_inference = false;
string mode = "default";
string metadata_compression_codec = "none";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -220,24 +225,27 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
}
} else if (loption == "mode") {
mode = StringValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>());
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>());
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths);
IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
auto data_files = iceberg_table.GetPaths<IcebergManifestContentType::DATA>();
auto delete_files = iceberg_table.GetPaths<IcebergManifestContentType::DELETE>();
vector<Value> data_file_values;
Expand All @@ -254,7 +262,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
if (mode == "list_files") {
return MakeListFilesExpression(data_file_values, delete_file_values);
} else if (mode == "default") {
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths);
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference);
} else {
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
}
Expand All @@ -265,22 +273,28 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {

auto fun = TableFunction({LogicalType::VARCHAR}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
Loading
Loading