-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: generic query building #5127
Changes from 1 commit
1d35fa4
3413289
f206d86
e825ee9
534f3c5
7eb0f3d
b013cb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,7 @@ use super::update::*; | |
use crate::row::ToSqlRow; | ||
use crate::value::to_prisma_value; | ||
use crate::{error::SqlError, QueryExt, Queryable}; | ||
use itertools::Itertools; | ||
use quaint::ast::{Insert, Query}; | ||
use quaint::ast::Query; | ||
use quaint::prelude::ResultSet; | ||
use quaint::{ | ||
error::ErrorKind, | ||
|
@@ -12,32 +11,9 @@ use quaint::{ | |
use query_structure::*; | ||
use sql_query_builder::{column_metadata, write, Context, FilterBuilder, SelectionResultExt, SqlTraceComment}; | ||
use std::borrow::Cow; | ||
use std::{ | ||
collections::{HashMap, HashSet}, | ||
ops::Deref, | ||
}; | ||
use std::collections::HashMap; | ||
use user_facing_errors::query_engine::DatabaseConstraint; | ||
|
||
#[cfg(target_arch = "wasm32")] | ||
macro_rules! trace { | ||
(target: $target:expr, $($arg:tt)+) => {{ | ||
// No-op in WebAssembly | ||
}}; | ||
($($arg:tt)+) => {{ | ||
// No-op in WebAssembly | ||
}}; | ||
} | ||
|
||
#[cfg(not(target_arch = "wasm32"))] | ||
macro_rules! trace { | ||
(target: $target:expr, $($arg:tt)+) => { | ||
tracing::log::trace!(target: $target, $($arg)+); | ||
}; | ||
($($arg:tt)+) => { | ||
tracing::log::trace!($($arg)+); | ||
}; | ||
} | ||
|
||
async fn generate_id( | ||
conn: &dyn Queryable, | ||
id_field: &FieldSelection, | ||
|
@@ -191,49 +167,6 @@ pub(crate) async fn create_record( | |
} | ||
} | ||
|
||
/// Returns a set of fields that are used in the arguments for the create operation. | ||
fn collect_affected_fields(args: &[WriteArgs], model: &Model) -> HashSet<ScalarFieldRef> { | ||
let mut fields = HashSet::new(); | ||
args.iter().for_each(|arg| fields.extend(arg.keys())); | ||
|
||
fields | ||
.into_iter() | ||
.map(|dsfn| model.fields().scalar().find(|sf| sf.db_name() == dsfn.deref()).unwrap()) | ||
.collect() | ||
} | ||
|
||
/// Generates a list of insert statements to execute. If `selected_fields` is set, insert statements | ||
/// will return the specified columns of inserted rows. | ||
pub fn generate_insert_statements( | ||
model: &Model, | ||
args: Vec<WriteArgs>, | ||
skip_duplicates: bool, | ||
selected_fields: Option<&ModelProjection>, | ||
ctx: &Context<'_>, | ||
) -> Vec<Insert<'static>> { | ||
let affected_fields = collect_affected_fields(&args, model); | ||
|
||
if affected_fields.is_empty() { | ||
args.into_iter() | ||
.map(|_| write::create_records_empty(model, skip_duplicates, selected_fields, ctx)) | ||
.collect() | ||
} else { | ||
let partitioned_batches = partition_into_batches(args, ctx); | ||
trace!("Total of {} batches to be executed.", partitioned_batches.len()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've removed this trace, not sure if we'll need it since this can be introspected pretty easy now that query building is separated, but I can add it back, it'll require depending on |
||
trace!( | ||
"Batch sizes: {:?}", | ||
partitioned_batches.iter().map(|b| b.len()).collect_vec() | ||
); | ||
|
||
partitioned_batches | ||
.into_iter() | ||
.map(|batch| { | ||
write::create_records_nonempty(model, batch, skip_duplicates, &affected_fields, selected_fields, ctx) | ||
}) | ||
.collect() | ||
} | ||
} | ||
|
||
/// Inserts records specified as a list of `WriteArgs`. Returns number of inserted records. | ||
pub(crate) async fn create_records_count( | ||
conn: &dyn Queryable, | ||
|
@@ -242,7 +175,7 @@ pub(crate) async fn create_records_count( | |
skip_duplicates: bool, | ||
ctx: &Context<'_>, | ||
) -> crate::Result<usize> { | ||
let inserts = generate_insert_statements(model, args, skip_duplicates, None, ctx); | ||
let inserts = write::generate_insert_statements(model, args, skip_duplicates, None, ctx); | ||
let mut count = 0; | ||
for insert in inserts { | ||
count += conn.execute(insert.into()).await?; | ||
|
@@ -265,7 +198,7 @@ pub(crate) async fn create_records_returning( | |
let idents = selected_fields.type_identifiers_with_arities(); | ||
let meta = column_metadata::create(&field_names, &idents); | ||
let mut records = ManyRecords::new(field_names.clone()); | ||
let inserts = generate_insert_statements(model, args, skip_duplicates, Some(&selected_fields.into()), ctx); | ||
let inserts = write::generate_insert_statements(model, args, skip_duplicates, Some(&selected_fields.into()), ctx); | ||
|
||
for insert in inserts { | ||
let result_set = conn.query(insert.into()).await?; | ||
|
@@ -281,74 +214,6 @@ pub(crate) async fn create_records_returning( | |
Ok(records) | ||
} | ||
|
||
/// Partitions data into batches, respecting `max_bind_values` and `max_insert_rows` settings from | ||
/// the `Context`. | ||
fn partition_into_batches(args: Vec<WriteArgs>, ctx: &Context<'_>) -> Vec<Vec<WriteArgs>> { | ||
let batches = if let Some(max_params) = ctx.max_bind_values() { | ||
// We need to split inserts if they are above a parameter threshold, as well as split based on number of rows. | ||
// -> Horizontal partitioning by row number, vertical by number of args. | ||
args.into_iter() | ||
.peekable() | ||
.batching(|iter| { | ||
let mut param_count: usize = 0; | ||
let mut batch = vec![]; | ||
|
||
while param_count < max_params { | ||
// If the param count _including_ the next item doens't exceed the limit, | ||
// we continue filling up the current batch. | ||
let proceed = match iter.peek() { | ||
Some(next) => (param_count + next.len()) <= max_params, | ||
None => break, | ||
}; | ||
|
||
if proceed { | ||
match iter.next() { | ||
Some(next) => { | ||
param_count += next.len(); | ||
batch.push(next) | ||
} | ||
None => break, | ||
} | ||
} else { | ||
break; | ||
} | ||
} | ||
|
||
if batch.is_empty() { | ||
None | ||
} else { | ||
Some(batch) | ||
} | ||
}) | ||
.collect_vec() | ||
} else { | ||
vec![args] | ||
}; | ||
|
||
if let Some(max_rows) = ctx.max_insert_rows() { | ||
let capacity = batches.len(); | ||
batches | ||
.into_iter() | ||
.fold(Vec::with_capacity(capacity), |mut batches, next_batch| { | ||
if next_batch.len() > max_rows { | ||
batches.extend( | ||
next_batch | ||
.into_iter() | ||
.chunks(max_rows) | ||
.into_iter() | ||
.map(|chunk| chunk.into_iter().collect_vec()), | ||
); | ||
} else { | ||
batches.push(next_batch); | ||
} | ||
|
||
batches | ||
}) | ||
} else { | ||
batches | ||
} | ||
} | ||
|
||
/// Update one record in a database defined in `conn` and the records | ||
/// defined in `args`, resulting the identifiers that were modified in the | ||
/// operation. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
mod query; | ||
|
||
use query::translate_query; | ||
use query_builder::QueryBuilder; | ||
use thiserror::Error; | ||
|
||
use crate::{EdgeRef, Node, NodeRef, Query, QueryGraph}; | ||
|
@@ -12,17 +13,17 @@ pub enum TranslateError { | |
#[error("node {0} has no content")] | ||
NodeContentEmpty(String), | ||
|
||
#[error("{0}")] | ||
QuaintError(#[from] quaint::error::Error), | ||
#[error("query builder error: {0}")] | ||
QueryBuildFailure(#[source] Box<dyn std::error::Error + Send + Sync>), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made the error dynamic too, it might be odd to use a dynamic query builder with non-dynamic errors, but I'm open to suggestions |
||
} | ||
|
||
pub type TranslateResult<T> = Result<T, TranslateError>; | ||
|
||
pub fn translate(mut graph: QueryGraph) -> TranslateResult<Expression> { | ||
pub fn translate(mut graph: QueryGraph, builder: &dyn QueryBuilder) -> TranslateResult<Expression> { | ||
graph | ||
.root_nodes() | ||
.into_iter() | ||
.map(|node| NodeTranslator::new(&mut graph, node, &[]).translate()) | ||
.map(|node| NodeTranslator::new(&mut graph, node, &[], builder).translate()) | ||
.collect::<TranslateResult<Vec<_>>>() | ||
.map(Expression::Seq) | ||
} | ||
|
@@ -32,14 +33,21 @@ struct NodeTranslator<'a, 'b> { | |
node: NodeRef, | ||
#[allow(dead_code)] | ||
parent_edges: &'b [EdgeRef], | ||
query_builder: &'b dyn QueryBuilder, | ||
} | ||
|
||
impl<'a, 'b> NodeTranslator<'a, 'b> { | ||
fn new(graph: &'a mut QueryGraph, node: NodeRef, parent_edges: &'b [EdgeRef]) -> Self { | ||
fn new( | ||
graph: &'a mut QueryGraph, | ||
node: NodeRef, | ||
parent_edges: &'b [EdgeRef], | ||
query_builder: &'b dyn QueryBuilder, | ||
) -> Self { | ||
Self { | ||
graph, | ||
node, | ||
parent_edges, | ||
query_builder, | ||
} | ||
} | ||
|
||
|
@@ -64,7 +72,7 @@ impl<'a, 'b> NodeTranslator<'a, 'b> { | |
.try_into() | ||
.expect("current node must be query"); | ||
|
||
translate_query(query) | ||
translate_query(query, self.query_builder) | ||
} | ||
|
||
#[allow(dead_code)] | ||
|
@@ -99,7 +107,7 @@ impl<'a, 'b> NodeTranslator<'a, 'b> { | |
.into_iter() | ||
.map(|(_, node)| { | ||
let edges = self.graph.incoming_edges(&node); | ||
NodeTranslator::new(self.graph, node, &edges).translate() | ||
NodeTranslator::new(self.graph, node, &edges, self.query_builder).translate() | ||
}) | ||
.collect::<Result<Vec<_>, _>>()?; | ||
|
||
|
@@ -121,7 +129,7 @@ impl<'a, 'b> NodeTranslator<'a, 'b> { | |
.map(|(_, node)| { | ||
let name = node.id(); | ||
let edges = self.graph.incoming_edges(&node); | ||
let expr = NodeTranslator::new(self.graph, node, &edges).translate()?; | ||
let expr = NodeTranslator::new(self.graph, node, &edges, self.query_builder).translate()?; | ||
Ok(Binding { name, expr }) | ||
}) | ||
.collect::<TranslateResult<Vec<_>>>()?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this and one other function to the sql builder crate, might wanna sync this with main