Skip to content

Commit

Permalink
turbofish for async result, DataFusionError handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jreuben11 committed Oct 29, 2024
1 parent 1ed1194 commit c86ded6
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions data_fusion/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::SchemaError;
use datafusion::common::DataFusionError;
use datafusion::functions_aggregate::expr_fn::{max, min};
use datafusion::prelude::*;
use std::sync::Arc;
Expand Down Expand Up @@ -99,9 +99,11 @@ async fn df_greatest(batch: &RecordBatch, col_name: &str) -> datafusion::error::
ctx.register_batch("greatest", batch.clone())?;
let df = ctx.table("greatest").await?;

// use max aggregate function on passed in col name
let df = df.aggregate(vec![], vec![max(col(col_name))])?;

Ok(df)
// Turbofish for propogating async results
Ok::<DataFrame, DataFusionError>(df)
}

#[tokio::main]
Expand All @@ -117,9 +119,31 @@ async fn main() -> datafusion::error::Result<()> {
let df2 = df_greatest(&batch, "letter").await?;
df2.show().await?;

let _err1 = match df_greatest(&batch, "blah").await? {
df => Some(df),
_ => None, // TODO: handle different errors from DataFusionError enum https://docs.rs/datafusion-common/latest/datafusion_common/error/enum.DataFusionError.html as well as specifics of SchemaError
match df_greatest(&batch, "blah").await {
Ok(_df) => println!("ok!"),
Err(e) => {
// TODO: custom logic for different error types
match e {
DataFusionError::ArrowError(_, _) => {}
DataFusionError::IoError(_) => {}
DataFusionError::SQL(_, _) => {}
DataFusionError::NotImplemented(_) => {}
DataFusionError::Internal(_) => {}
DataFusionError::Plan(_) => {}
DataFusionError::Configuration(_) => {}
DataFusionError::SchemaError(_, _) => {}
DataFusionError::Execution(_) => {}
DataFusionError::ExecutionJoin(_) => {}
DataFusionError::ResourcesExhausted(_) => {}
DataFusionError::External(_) => {}
DataFusionError::Context(_, _) => {}
DataFusionError::Substrait(_) => {}
// note: 2 new enum vals not in rustdoc:
DataFusionError::ParquetError(_) => {}
DataFusionError::ObjectStore(_) => {}
};
println!("{}", e.message())
}
};
println!("no panic");
Ok(())
Expand Down Expand Up @@ -156,7 +180,6 @@ mod tests {
&batches1
);


let batches2 = df2.collect().await.unwrap();
assert_batches_eq!(
vec![
Expand All @@ -169,6 +192,15 @@ mod tests {
&batches2
);

// TODO: test non happy path of errors
let is_schema_error = match df_greatest(&batch, "blah").await {
Ok(_) => false,
Err(e) => {
match e {
DataFusionError::SchemaError(_, _) => true,
_ => false,
}
}
};
assert!(is_schema_error == true);
}
}

0 comments on commit c86ded6

Please sign in to comment.