Skip to content

Commit

Permalink
Merge pull request duckdb#63 from teaguesterling/issues/29
Browse files Browse the repository at this point in the history
Addresses duckdb#29: Support missing version-hint.txt and provide additional options
  • Loading branch information
samansmink authored Aug 13, 2024
2 parents 10e0862 + 201be6b commit 75f880d
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 39 deletions.
54 changes: 37 additions & 17 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,30 +167,51 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, File
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference);
}

// Function to generate a metadata file url
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) {
if (metadata_compression_codec != "gzip") {
return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
// Function to generate a metadata file url from version and format string
// default format is "v%s%s.metadata.json" -> v00###-xxxxxxxxx-.gz.metadata.json"
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) {
// TODO: Need to URL Encode table_version
string compression_suffix = "";
string url;
if (metadata_compression_codec == "gzip") {
compression_suffix = ".gz";
}
for(auto try_format : StringUtil::Split(version_format, ',')) {
url = fs.JoinPath(meta_path, StringUtil::Format(try_format, table_version, compression_suffix));
if(fs.FileExists(url)) {
return url;
}
}
return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");

throw IOException(
"Iceberg metadata file not found for table version '%s' using '%s' compression and format(s): '%s'", table_version, metadata_compression_codec, version_format);
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
string metadata_file_path;

string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_VERSION_HINT_FILE, string version_format = DEFAULT_TABLE_VERSION_FORMAT) {
if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
return path;
}

auto meta_path = fs.JoinPath(path, "metadata");
string version_hint;
if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
version_hint = GetTableVersion(meta_path, fs, table_version);
} else {
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec);
version_hint = table_version;
}
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
}


string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
if (metadata_compression_codec == "gzip") {
return IcebergUtils::GzFileToString(metadata_file_path, fs);
return IcebergUtils::GzFileToString(path, fs);
}
return IcebergUtils::FileToString(metadata_file_path, fs);
return IcebergUtils::FileToString(path, fs);
}


IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec,
bool skip_schema_inference) {
Expand All @@ -217,9 +238,8 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
return ret;
}

string IcebergSnapshot::GetTableVersion(const string &path, FileSystem &fs) {
auto meta_path = fs.JoinPath(path, "metadata");
auto version_file_path = fs.JoinPath(meta_path, "version-hint.text");
string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) {
auto version_file_path = fs.JoinPath(meta_path, version_file);
auto version_file_content = IcebergUtils::FileToString(version_file_path, fs);

try {
Expand Down Expand Up @@ -288,4 +308,4 @@ yyjson_val *IcebergSnapshot::IcebergSnapshot::FindSnapshotByIdTimestampInternal(
return max_snapshot;
}

} // namespace duckdb
} // namespace duckdb
26 changes: 20 additions & 6 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;
string table_version = DEFAULT_VERSION_HINT_FILE;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -66,20 +68,26 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
} else if (loption == "version") {
table_version = StringValue::Get(kv.second);
} else if (loption == "version_name_format") {
version_name_format = StringValue::Get(kv.second);
}
}

auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format);
IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference);
}

ret->iceberg_table =
Expand Down Expand Up @@ -143,23 +151,29 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {

auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
19 changes: 16 additions & 3 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
bool skip_schema_inference = false;
string mode = "default";
string metadata_compression_codec = "none";
string table_version = DEFAULT_VERSION_HINT_FILE;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -229,20 +231,25 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
} else if (loption == "version") {
table_version = StringValue::Get(kv.second);
} else if (loption == "version_name_format") {
version_name_format = StringValue::Get(kv.second);
}
}
auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format);
IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
Expand Down Expand Up @@ -277,6 +284,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
Expand All @@ -286,6 +295,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
Expand All @@ -295,6 +306,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
17 changes: 16 additions & 1 deletion src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ struct IcebergSnaphotsBindData : public TableFunctionData {
IcebergSnaphotsBindData() {};
string filename;
string metadata_compression_codec;
string table_version;
string version_name_format;
bool skip_schema_inference = false;
};

Expand All @@ -29,7 +31,10 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
auto global_state = make_uniq<IcebergSnapshotGlobalTableFunctionState>();

FileSystem &fs = FileSystem::GetFileSystem(context);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec);

auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(
bind_data.filename, fs, bind_data.metadata_compression_codec, bind_data.table_version, bind_data.version_name_format);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(iceberg_meta_path, fs, bind_data.metadata_compression_codec);
global_state->metadata_doc =
yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0);
auto root = yyjson_doc_get_root(global_state->metadata_doc);
Expand All @@ -50,19 +55,27 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
auto bind_data = make_uniq<IcebergSnaphotsBindData>();

string metadata_compression_codec = "none";
string table_version = DEFAULT_VERSION_HINT_FILE;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;
bool skip_schema_inference = false;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "version") {
table_version = StringValue::Get(kv.second);
} else if (loption == "version_name_format") {
version_name_format = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}
bind_data->filename = input.inputs[0].ToString();
bind_data->metadata_compression_codec = metadata_compression_codec;
bind_data->skip_schema_inference = skip_schema_inference;
bind_data->table_version = table_version;
bind_data->version_name_format = version_name_format;

names.emplace_back("sequence_number");
return_types.emplace_back(LogicalType::UBIGINT);
Expand Down Expand Up @@ -115,6 +128,8 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() {
TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind,
IcebergSnapshotGlobalTableFunctionState::Init);
table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
table_function.named_parameters["version"] = LogicalType::VARCHAR;
table_function.named_parameters["version_name_format"] = LogicalType::VARCHAR;
table_function.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
function_set.AddFunction(table_function);
return function_set;
Expand Down
19 changes: 13 additions & 6 deletions src/include/iceberg_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ using namespace duckdb_yyjson;

namespace duckdb {

// First arg is version string, arg is either empty or ".gz" if gzip
// Allows for both "v###.gz.metadata.json" and "###.metadata.json" styles
static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata.json";

static string DEFAULT_VERSION_HINT_FILE = "version-hint.text";

struct IcebergColumnDefinition {
public:
static IcebergColumnDefinition ParseFromJson(yyjson_val *val);
Expand Down Expand Up @@ -61,19 +67,20 @@ class IcebergSnapshot {
vector<IcebergColumnDefinition> schema;
string metadata_compression_codec = "none";

static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, bool skip_schema_inference);

static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec, bool skip_schema_inference);
static string ReadMetaData(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static string GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version, string version_format);
static string ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec);
static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static unique_ptr<SnapshotParseInfo> GetParseInfo(yyjson_doc &metadata_json);

protected:
//! Internal JSON parsing functions
static string GetTableVersion(const string &path, FileSystem &fs);
static string GetTableVersion(const string &path, FileSystem &fs, string version_format);
static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots);
static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id);
static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp);
Expand Down Expand Up @@ -124,4 +131,4 @@ struct IcebergTable {
string path;
};

} // namespace duckdb
} // namespace duckdb
Loading

0 comments on commit 75f880d

Please sign in to comment.