Skip to content

Commit

Permalink
added schema evolution to nested datatypes
Browse files Browse the repository at this point in the history
Signed-off-by: JustinRush80 <[email protected]>
  • Loading branch information
JustinRush80 committed Jan 27, 2025
1 parent 852a3fb commit 832848e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 30 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/operations/cast/merge_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub(crate) fn merge_arrow_field(
/// Merges Arrow Table schema and Arrow Batch Schema, by allowing Large/View Types to passthrough.
// Sometimes fields can't be merged because they are not the same types. So table has int32,
// but batch int64. We want the preserve the table type. At later stage we will call cast_record_batch
// which will cast the batch int64->int32. This is desired behaviour so we can have flexibility
// which will cast the batch int64->int32. This is desired behavior so we can have flexibility
// in the batch data types. But preserve the correct table and parquet types.
//
// Preserve_new_fields can also be disabled if you just want to only use the passthrough functionality
Expand Down
129 changes: 100 additions & 29 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion::{
prelude::{cast, DataFrame, SessionContext},
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Column, DFSchema, ScalarValue, TableReference};
use datafusion_common::{Column, DFSchema, ExprSchema, ScalarValue, TableReference};
use datafusion_expr::{col, conditional_expressions::CaseBuilder, lit, when, Expr, JoinType};
use datafusion_expr::{
ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode,
Expand Down Expand Up @@ -80,7 +80,7 @@ use crate::delta_datafusion::{

use crate::kernel::{Action, DataCheck, Metadata, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::operations::cast::merge_schema::merge_arrow_schema;
use crate::operations::cast::merge_schema::{merge_arrow_field, merge_arrow_schema};
use crate::operations::cdc::*;
use crate::operations::merge::barrier::find_node;
use crate::operations::transaction::CommitBuilder;
Expand Down Expand Up @@ -968,7 +968,7 @@ async fn execute(
.collect::<Result<Vec<MergeOperation>, DeltaTableError>>()?;

// merge_arrow_schema is used to tell whether the two schema can be merge but we use the operation statement to pick new columns
// this avoid the side effect of adding unnessary columns (eg. target.id = source.ID) "ID" will not be added since "id" exist in target and end user intended it to be "id"
// this avoid the side effect of adding unnecessary columns (eg. target.id = source.ID) "ID" will not be added since "id" exist in target and end user intended it to be "id"
let mut new_schema = None;
let mut schema_action = None;
if merge_schema {
Expand Down Expand Up @@ -1150,16 +1150,24 @@ async fn execute(
None => TableReference::none(),
};
let name = delta_field.name();

// check if the name of column is in the target table
let column = if snapshot.schema().index_of(name).is_none() {
let mut cast_type: DataType = delta_field.data_type().try_into()?;

// Receive the correct column reference given that some columns are only in source table
let column = if let Some(field) = snapshot.schema().field(name) {
if field == delta_field {
Column::new(qualifier.clone(), name)
} else {
// when there is a change in the field such as an added column in a nested data types casts will break with the new field data type
let col_ref = Column::new(source_qualifier.clone(), name);
cast_type = source_schema.data_type(&col_ref)?.to_owned();
col_ref
}
} else {
null_target_column = Some(cast(
lit(ScalarValue::Null).alias(name),
delta_field.data_type().try_into()?,
));
Column::new(source_qualifier.clone(), name)
} else {
Column::new(qualifier.clone(), name)
};

for (idx, (operations, _)) in ops.iter().enumerate() {
Expand All @@ -1181,22 +1189,23 @@ async fn execute(
.end()?;

let name = "__delta_rs_c_".to_owned() + delta_field.name();

write_projection.push(cast(
Expr::Column(Column::from_name(name.clone())).alias(delta_field.name()),
delta_field.data_type().try_into()?,
cast_type.clone(),
));

write_projection_with_cdf.push(
when(
col(CDC_COLUMN_NAME).not_eq(lit("update_preimage")),
cast(
Expr::Column(Column::from_name(name.clone())),
delta_field.data_type().try_into()?,
cast_type.clone(),
),
)
.otherwise(null_target_column.unwrap_or(cast(
Expr::Column(Column::new(qualifier, delta_field.name())),
delta_field.data_type().try_into()?,
cast_type,
)))? // We take the column from target table but in case of schema evolution we assign the column as null
.alias(delta_field.name()),
);
Expand Down Expand Up @@ -1511,9 +1520,18 @@ fn modify_schema(
.filter(|ops| matches!(ops.r#type, OperationType::Update | OperationType::Insert))
.flat_map(|ops| ops.operations.keys())
{
if target_schema.field_from_column(columns).is_err() {
let new_fields = source_schema.field_with_unqualified_name(columns.name())?;
ending_schema.push(new_fields.to_owned().with_nullable(true));
// This assume that all the columns in the MERGE operations of UPDATE and INSERT at least exists in the source table
let source_field = source_schema.field_with_unqualified_name(columns.name())?;

if let Some(target_field) = target_schema.field_from_column(columns).ok() {
// for nested data types we need to first merge then see if there a change then replace the pre-existing field
let new_field = merge_arrow_field(target_field, source_field, true)?;
if &new_field == target_field {
continue;
}
ending_schema.try_merge(&Arc::new(new_field))?;
} else {
ending_schema.push(source_field.to_owned().with_nullable(true));
}
}
Ok(())
Expand Down Expand Up @@ -1687,6 +1705,39 @@ mod tests {
.unwrap()
}

async fn write_data_struct(table: DeltaTable, schema: &Arc<ArrowSchema>) -> DeltaTable {
let count_array = arrow::array::Int64Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
let nested_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"count",
ArrowDataType::Int64,
true,
)]));
let batch = RecordBatch::try_new(
Arc::clone(schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StructArray::from(
RecordBatch::try_new(nested_schema, vec![Arc::new(count_array)]).unwrap(),
)),
],
)
.unwrap();
// write some data
DeltaOps(table)
.write(vec![batch.clone()])
.with_schema_mode(crate::operations::write::SchemaMode::Overwrite)
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap()
}

fn merge_source(schema: Arc<ArrowSchema>) -> DataFrame {
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Expand Down Expand Up @@ -1977,8 +2028,8 @@ mod tests {
}

#[tokio::test]
async fn test_merge_with_schema_merge_and_struct_added_column() {
let (table, _) = setup().await;
async fn test_merge_with_schema_merge_and_pre_existing_struct_added_column() {
let table = setup_table(None).await;

let nested_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"count",
Expand All @@ -1996,19 +2047,39 @@ mod tests {
true,
),
]));
let count_array = arrow::array::Int64Array::from(vec![Some(1)]);

let table_with_struct = write_data_struct(table, &schema).await;

let nested_schema_source = Arc::new(ArrowSchema::new(vec![Field::new(
"name",
ArrowDataType::Utf8,
true,
)]));

let schema_source = Arc::new(ArrowSchema::new(vec![
Field::new("id", ArrowDataType::Utf8, true),
Field::new("value", ArrowDataType::Int32, true),
Field::new("modified", ArrowDataType::Utf8, true),
Field::new(
"nested",
ArrowDataType::Struct(nested_schema_source.fields().clone()),
true,
),
]));

let name_array = arrow::array::StringArray::from(vec![Some("John")]);
let id_array = arrow::array::StringArray::from(vec![Some("X")]);
let value_array = arrow::array::Int32Array::from(vec![Some(1)]);
let modified_array = arrow::array::StringArray::from(vec![Some("2021-02-02")]);

let batch = RecordBatch::try_new(
schema,
schema_source,
vec![
Arc::new(id_array),
Arc::new(value_array),
Arc::new(modified_array),
Arc::new(arrow::array::StructArray::from(
RecordBatch::try_new(nested_schema, vec![Arc::new(count_array)]).unwrap(),
RecordBatch::try_new(nested_schema_source, vec![Arc::new(name_array)]).unwrap(),
)),
],
)
Expand All @@ -2018,7 +2089,7 @@ mod tests {

let source = ctx.read_batch(batch).unwrap();

let (table, _) = DeltaOps(table.clone())
let (table, _) = DeltaOps(table_with_struct)
.merge(source, col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
Expand Down Expand Up @@ -2052,15 +2123,15 @@ mod tests {

assert!(schema_actions);
let expected = vec![
"+----+-------+------------+------------+",
"| id | value | modified | nested |",
"+----+-------+------------+------------+",
"| A | 1 | 2021-02-01 | |",
"| B | 10 | 2021-02-01 | |",
"| C | 10 | 2021-02-02 | |",
"| D | 100 | 2021-02-02 | |",
"| X | 1 | 2021-02-02 | {count: 1} |",
"+----+-------+------------+------------+",
"+----+-------+------------+-----------------------+",
"| id | value | modified | nested |",
"+----+-------+------------+-----------------------+",
"| A | 1 | 2021-02-01 | {count: 1, name: } |",
"| B | 10 | 2021-02-01 | {count: 2, name: } |",
"| C | 10 | 2021-02-02 | {count: 3, name: } |",
"| D | 100 | 2021-02-02 | {count: 4, name: } |",
"| X | 1 | 2021-02-02 | {count: , name: John} |",
"+----+-------+------------+-----------------------+",
];
let actual = get_data(&table).await;

Expand Down

0 comments on commit 832848e

Please sign in to comment.