Skip to content

Commit

Permalink
abstract the field_id into Value, as it might need to be a string lat…
Browse files Browse the repository at this point in the history
…er (for delta)
  • Loading branch information
Tishj committed Dec 11, 2024
1 parent 539efef commit 9bc095c
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 14 deletions.
3 changes: 2 additions & 1 deletion extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,15 @@ static MultiFileReaderBindData BindSchema(ClientContext &context, vector<Logical
vector<string> schema_col_names;
vector<LogicalType> schema_col_types;
MultiFileReaderBindData bind_data;
bind_data.mapping = MultiFileReaderColumnMapping::BY_FIELD_ID;
schema_col_names.reserve(options.schema.size());
schema_col_types.reserve(options.schema.size());
for (const auto &column : options.schema) {
schema_col_names.push_back(column.name);
schema_col_types.push_back(column.type);

auto res = MultiFileReaderColumn(column.name, column.type);
res.field_id = column.field_id;
res.identifier = Value::INTEGER(column.field_id);
res.default_expression = make_uniq<ConstantExpression>(column.default_value);
bind_data.schema.emplace_back(std::move(res));
}
Expand Down
4 changes: 2 additions & 2 deletions extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,12 @@ void ParquetReader::InitializeSchema(ClientContext &context) {
auto &child_readers = struct_reader.child_readers;
D_ASSERT(root_type.id() == LogicalTypeId::STRUCT);
// FIXME: what is this + 1 logically for???
D_ASSERT(child_types.size() + 1 == child_readers.size());
D_ASSERT(child_readers.size() >= child_types.size());
for (idx_t i = 0; i < child_types.size(); i++) {
auto &type_pair = child_types[i];
auto column = MultiFileReaderColumn(type_pair.first, type_pair.second);
if (child_readers[i]->Schema().__isset.field_id) {
column.field_id = child_readers[i]->Schema().field_id;
column.identifier = Value::INTEGER(child_readers[i]->Schema().field_id);
}
columns.emplace_back(std::move(column));
}
Expand Down
127 changes: 119 additions & 8 deletions src/common/multi_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/config.hpp"
#include "duckdb/planner/expression/bound_columnref_expression.hpp"
#include "duckdb/parser/expression/constant_expression.hpp"
#include "duckdb/common/string_util.hpp"

#include <algorithm>
Expand Down Expand Up @@ -52,7 +53,6 @@ void MultiFileReader::AddParameters(TableFunction &table_function) {
table_function.named_parameters["union_by_name"] = LogicalType::BOOLEAN;
table_function.named_parameters["hive_types"] = LogicalType::ANY;
table_function.named_parameters["hive_types_autocast"] = LogicalType::BOOLEAN;
table_function.named_parameters["column_mapping"] = LogicalType::VARCHAR;
}

vector<string> MultiFileReader::ParsePaths(const Value &input) {
Expand Down Expand Up @@ -306,11 +306,12 @@ MultiFileReader::InitializeGlobalState(ClientContext &context, const MultiFileRe
return nullptr;
}

void MultiFileReader::CreateNameMapping(const string &file_name, const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids, MultiFileReaderData &reader_data,
const MultiFileReaderBindData &bind_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
void MultiFileReader::CreateMappingByName(const string &file_name, const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids,
MultiFileReaderData &reader_data, const MultiFileReaderBindData &bind_data,
const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
// we have expected types: create a map of name -> column index
case_insensitive_map_t<idx_t> name_map;
for (idx_t col_idx = 0; col_idx < local_columns.size(); col_idx++) {
Expand All @@ -335,7 +336,7 @@ void MultiFileReader::CreateNameMapping(const string &file_name, const vector<Mu
auto global_id = global_idx.GetPrimaryIndex();
if (global_id >= global_columns.size()) {
throw InternalException(
"MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file");
"MultiFileReader::CreateMappingByName - global_id is out of range in global_types for this file");
}
auto &global_name = global_columns[global_id].name;
auto entry = name_map.find(global_name);
Expand Down Expand Up @@ -376,12 +377,123 @@ void MultiFileReader::CreateNameMapping(const string &file_name, const vector<Mu
reader_data.empty_columns = reader_data.column_indexes.empty();
}

void MultiFileReader::CreateMappingByFieldId(const string &file_name,
const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids,
MultiFileReaderData &reader_data, const MultiFileReaderBindData &bind_data,
const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
// we have expected types: create a map of name -> column index
#ifdef DEBUG
//! Make sure the global columns have field_ids to match on
for (auto &column : global_columns) {
D_ASSERT(!column.identifier.IsNull());
D_ASSERT(column.identifier.type().id() == LogicalTypeId::INTEGER);
}
#endif

unordered_map<idx_t, idx_t> field_id_map;
for (idx_t col_idx = 0; col_idx < local_columns.size(); col_idx++) {
auto &column = local_columns[col_idx];
if (!column.identifier.IsNull()) {
// Extra columns at the end will not have a field_id
break;
}
auto field_id = column.identifier.GetValue<int32_t>();
field_id_map[field_id] = col_idx;
}

// loop through the schema definition
for (idx_t i = 0; i < global_column_ids.size(); i++) {

// check if this is a constant column
bool constant = false;
for (auto &entry : reader_data.constant_map) {
if (entry.column_id == i) {
constant = true;
break;
}
}
if (constant) {
// this column is constant for this file
continue;
}

// Handle any generate columns that are not in the schema (currently only file_row_number)
auto &global_idx = global_column_ids[i];
auto global_id = global_column_ids[i].GetPrimaryIndex();
if (global_id >= field_id_map.size()) {
if (bind_data.file_row_number_idx == global_id) {
reader_data.column_mapping.push_back(i);
// FIXME: 'reader.file_row_number_idx'
// It seems to me that we need to push some extra local state for this
reader_data.column_ids.push_back(42);
}
continue;
}

const auto &global_column = global_columns[global_id];
D_ASSERT(!global_column.identifier.IsNull());
auto it = field_id_map.find(global_column.identifier.GetValue<int32_t>());
if (it == field_id_map.end()) {
// field id not present in file, use default value
auto &default_val = global_column.default_expression;
D_ASSERT(default_val);
if (default_val->type != ExpressionType::VALUE_CONSTANT) {
throw NotImplementedException("Default expression that isn't constant is not supported yet");
}
auto &constant_expr = default_val->Cast<ConstantExpression>();
reader_data.constant_map.emplace_back(i, constant_expr.value);
continue;
}

const auto &local_id = it->second;
auto &local_column = local_columns[local_id];
ColumnIndex local_index(local_id);
if (local_column.type != global_column.type) {
// differing types, wrap in a cast column reader
reader_data.cast_map[local_id] = global_column.type;
} else {
local_index = ColumnIndex(local_id, global_idx.GetChildIndexes());
}

reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(local_id);
reader_data.column_indexes.push_back(std::move(local_index));
}
reader_data.empty_columns = reader_data.column_ids.empty();
}

void MultiFileReader::CreateNameMapping(const string &file_name, const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids, MultiFileReaderData &reader_data,
const MultiFileReaderBindData &bind_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
switch (bind_data.mapping) {
case MultiFileReaderColumnMapping::BY_NAME: {
CreateMappingByName(file_name, local_columns, global_columns, global_column_ids, reader_data, bind_data,
initial_file, global_state);
break;
}
case MultiFileReaderColumnMapping::BY_FIELD_ID: {
CreateMappingByFieldId(file_name, local_columns, global_columns, global_column_ids, reader_data, bind_data,
initial_file, global_state);
break;
}
default: {
throw InternalException("Unsupported MultiFileReaderColumnMapping type");
}
}
}

void MultiFileReader::CreateMapping(const string &file_name, const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids, optional_ptr<TableFilterSet> filters,
MultiFileReaderData &reader_data, const string &initial_file,
const MultiFileReaderBindData &bind_data,
optional_ptr<MultiFileReaderGlobalState> global_state) {
// copy global columns and inject any different defaults
CreateNameMapping(file_name, local_columns, global_columns, global_column_ids, reader_data, bind_data, initial_file,
global_state);
CreateFilterMap(global_columns, filters, reader_data, global_state);
Expand Down Expand Up @@ -481,7 +593,6 @@ void MultiFileReaderOptions::AddBatchInfo(BindInfo &bind_info) const {
bind_info.InsertOption("auto_detect_hive_partitioning", Value::BOOLEAN(auto_detect_hive_partitioning));
bind_info.InsertOption("union_by_name", Value::BOOLEAN(union_by_name));
bind_info.InsertOption("hive_types_autocast", Value::BOOLEAN(hive_types_autocast));
bind_info.InsertOption("column_mapping", Value("by_name"));
}

void UnionByName::CombineUnionTypes(const vector<string> &col_names, const vector<LogicalType> &sql_types,
Expand Down
20 changes: 17 additions & 3 deletions src/include/duckdb/common/multi_file_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct MultiFileReaderColumn {
MultiFileReaderColumn(const MultiFileReaderColumn &other)
: name(other.name), type(other.type), children(other.children),
default_expression(other.default_expression ? other.default_expression->Copy() : nullptr),
field_id(other.field_id) {
identifier(other.identifier) {
}

MultiFileReaderColumn(MultiFileReaderColumn &&other) noexcept = default;
Expand All @@ -53,7 +53,7 @@ struct MultiFileReaderColumn {
type = other.type;
children = other.children;
default_expression = other.default_expression ? other.default_expression->Copy() : nullptr;
field_id = other.field_id;
identifier = other.identifier;
}
return *this;
}
Expand Down Expand Up @@ -87,7 +87,9 @@ struct MultiFileReaderColumn {
LogicalType type;
vector<MultiFileReaderColumn> children;
unique_ptr<ParsedExpression> default_expression;
optional_idx field_id;

//! Either the field_id or the name to map on
Value identifier;
};

//! The bind data for the multi-file reader, obtained through MultiFileReader::BindReader
Expand All @@ -100,6 +102,8 @@ struct MultiFileReaderBindData {
idx_t file_row_number_idx = DConstants::INVALID_INDEX;
//! (optional) The schema set by the multi file reader
vector<MultiFileReaderColumn> schema;
//! The method used to map local -> global columns
MultiFileReaderColumnMapping mapping = MultiFileReaderColumnMapping::BY_NAME;

DUCKDB_API void Serialize(Serializer &serializer) const;
DUCKDB_API static MultiFileReaderBindData Deserialize(Deserializer &deserializer);
Expand Down Expand Up @@ -367,6 +371,16 @@ struct MultiFileReader {
const vector<ColumnIndex> &global_column_ids, MultiFileReaderData &reader_data,
const MultiFileReaderBindData &bind_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state);
virtual void CreateMappingByFieldId(const string &file_name, const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids, MultiFileReaderData &reader_data,
const MultiFileReaderBindData &bind_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state);
virtual void CreateMappingByName(const string &file_name, const vector<MultiFileReaderColumn> &local_columns,
const vector<MultiFileReaderColumn> &global_columns,
const vector<ColumnIndex> &global_column_ids, MultiFileReaderData &reader_data,
const MultiFileReaderBindData &bind_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state);

//! Used in errors to report which function is using this MultiFileReader
string function_name;
Expand Down

0 comments on commit 9bc095c

Please sign in to comment.