From e27d799752240cff11e6937e285db35f16c64132 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Mon, 15 Jan 2024 15:53:07 +0100 Subject: [PATCH] Use the transaction methods for committing, and implicitly roll back. Seems a lot safer than implementing this machinery ourselves. --- crates/query-engine/execution/src/mutation.rs | 5 ++- crates/query-engine/execution/src/query.rs | 41 +++++++++---------- crates/query-engine/sql/src/sql/ast.rs | 13 +++--- crates/query-engine/sql/src/sql/convert.rs | 25 +++++------ .../sql/src/sql/execution_plan.rs | 14 ++----- crates/query-engine/sql/src/sql/helpers.rs | 33 +++++++-------- .../snapshots/tests__mutations__simple.snap | 9 ++-- ...tests__transaction__select_with_limit.snap | 11 ++--- 8 files changed, 68 insertions(+), 83 deletions(-) diff --git a/crates/query-engine/execution/src/mutation.rs b/crates/query-engine/execution/src/mutation.rs index 9f962f74..2844c95d 100644 --- a/crates/query-engine/execution/src/mutation.rs +++ b/crates/query-engine/execution/src/mutation.rs @@ -30,7 +30,10 @@ pub async fn execute( let query_timer = metrics.time_query_execution(); let rows_result = execute_mutations(transaction.as_mut(), database_info, plan).await; - query_timer.complete_with(rows_result) + let rows = query_timer.complete_with(rows_result)?; + + transaction.commit().await?; + Ok(rows) } /// Run mutations, returning a result for each. diff --git a/crates/query-engine/execution/src/query.rs b/crates/query-engine/execution/src/query.rs index 034b7fe9..1a1d4bcf 100644 --- a/crates/query-engine/execution/src/query.rs +++ b/crates/query-engine/execution/src/query.rs @@ -6,9 +6,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use query_engine_sql::sql::execution_plan::{ExecutionPlan, Query}; use serde_json; use sqlformat; -use sqlx; -use sqlx::pool::PoolConnection; -use sqlx::{Postgres, Row}; +use sqlx::{self, Row}; use tracing::{info_span, Instrument}; use crate::database_info::DatabaseInfo; @@ -23,19 +21,19 @@ pub async fn execute( plan: ExecutionPlan, ) -> Result { let acquisition_timer = metrics.time_connection_acquisition_wait(); - let connection_result = pool - .acquire() + let transaction_result = pool + .begin() .instrument(info_span!("Acquire connection")) .await; - let mut connection = acquisition_timer - .complete_with(connection_result) + let mut transaction = acquisition_timer + .complete_with(transaction_result) .map_err(|err| { metrics.error_metrics.record_connection_acquisition_error(); err })?; let query_timer = metrics.time_query_execution(); - let rows_result = execute_query(&mut connection, database_info, plan).await; + let rows_result = execute_query(&mut transaction, database_info, plan).await; query_timer.complete_with(rows_result) } @@ -68,20 +66,19 @@ pub async fn explain( // Otherwise, we proceed as usual. else { let acquisition_timer = metrics.time_connection_acquisition_wait(); - let connection_result = pool - .acquire() + let transaction_result = pool + .begin() .instrument(info_span!("Acquire connection")) .await; - let mut connection = - acquisition_timer - .complete_with(connection_result) - .map_err(|err| { - metrics.error_metrics.record_connection_acquisition_error(); - err - })?; + let mut transaction = acquisition_timer + .complete_with(transaction_result) + .map_err(|err| { + metrics.error_metrics.record_connection_acquisition_error(); + err + })?; for statement in plan.pre { - execute_statement(&mut connection, &statement).await?; + execute_statement(&mut transaction, &statement).await?; } tracing::info!( @@ -97,7 +94,7 @@ pub async fn explain( let rows: Vec = { // run and fetch from the database sqlx_query - .fetch_all(connection.as_mut()) + .fetch_all(transaction.as_mut()) .instrument(info_span!( "Database request", internal.visibility = "user", @@ -123,7 +120,7 @@ pub async fn explain( } for statement in plan.post { - execute_statement(&mut connection, &statement).await? + execute_statement(&mut transaction, &statement).await? } Ok::(results.join("\n")) } @@ -140,7 +137,7 @@ pub async fn explain( /// Execute the query and return the result as bytes. async fn execute_query( - connection: &mut PoolConnection, + connection: &mut sqlx::PgConnection, database_info: &DatabaseInfo, plan: ExecutionPlan, ) -> Result { @@ -260,7 +257,7 @@ fn variables_to_json( /// Execute a sql statement against the database. async fn execute_statement( - connection: &mut PoolConnection, + connection: &mut sqlx::PgConnection, sql::string::Statement(statement): &sql::string::Statement, ) -> Result<(), Error> { tracing::info!( diff --git a/crates/query-engine/sql/src/sql/ast.rs b/crates/query-engine/sql/src/sql/ast.rs index 23a74157..0027a4ea 100644 --- a/crates/query-engine/sql/src/sql/ast.rs +++ b/crates/query-engine/sql/src/sql/ast.rs @@ -334,15 +334,12 @@ pub struct ColumnAlias { /// Transactions manipulation pub mod transaction { - /// Begin a transaction - pub struct Begin { - pub isolation_level: IsolationLevel, - pub transaction_mode: TransactionMode, + /// Set properties for a transaction + pub enum SetTransaction { + IsolationLevel(IsolationLevel), + Mode(Mode), } - /// Commit a transaction - pub struct Commit {} - #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize, schemars::JsonSchema)] /// The isolation level of transactions pub enum IsolationLevel { @@ -353,7 +350,7 @@ pub mod transaction { } /// The transaction mode of transactions - pub enum TransactionMode { + pub enum Mode { ReadWrite, ReadOnly, } diff --git a/crates/query-engine/sql/src/sql/convert.rs b/crates/query-engine/sql/src/sql/convert.rs index d85a9bf2..b9380013 100644 --- a/crates/query-engine/sql/src/sql/convert.rs +++ b/crates/query-engine/sql/src/sql/convert.rs @@ -554,12 +554,15 @@ impl OrderByDirection { } } -impl transaction::Begin { +impl transaction::SetTransaction { pub fn to_sql(&self, sql: &mut SQL) { - sql.append_syntax("BEGIN "); - self.isolation_level.to_sql(sql); - sql.append_syntax(" "); - self.transaction_mode.to_sql(sql); + sql.append_syntax("SET TRANSACTION "); + match self { + transaction::SetTransaction::IsolationLevel(isolation_level) => { + isolation_level.to_sql(sql) + } + transaction::SetTransaction::Mode(mode) => mode.to_sql(sql), + } } } @@ -574,17 +577,11 @@ impl transaction::IsolationLevel { } } -impl transaction::TransactionMode { +impl transaction::Mode { pub fn to_sql(&self, sql: &mut SQL) { match self { - transaction::TransactionMode::ReadWrite => sql.append_syntax("READ WRITE"), - transaction::TransactionMode::ReadOnly => sql.append_syntax("READ ONLY"), + transaction::Mode::ReadWrite => sql.append_syntax("READ WRITE"), + transaction::Mode::ReadOnly => sql.append_syntax("READ ONLY"), } } } - -impl transaction::Commit { - pub fn to_sql(&self, sql: &mut SQL) { - sql.append_syntax("COMMIT"); - } -} diff --git a/crates/query-engine/sql/src/sql/execution_plan.rs b/crates/query-engine/sql/src/sql/execution_plan.rs index 2262c88b..52010fd2 100644 --- a/crates/query-engine/sql/src/sql/execution_plan.rs +++ b/crates/query-engine/sql/src/sql/execution_plan.rs @@ -56,16 +56,13 @@ pub fn simple_query_execution_plan( query: sql::ast::Select, ) -> ExecutionPlan { ExecutionPlan { - pre: sql::helpers::begin( - isolation_level, - sql::ast::transaction::TransactionMode::ReadOnly, - ), + pre: sql::helpers::set_transaction(isolation_level, sql::ast::transaction::Mode::ReadOnly), query: Query { variables, root_field, query, }, - post: sql::helpers::commit(), + post: vec![], } } @@ -98,11 +95,8 @@ pub fn simple_mutations_execution_plan( mutations: Vec, ) -> ExecutionPlan { ExecutionPlan { - pre: sql::helpers::begin( - isolation_level, - sql::ast::transaction::TransactionMode::ReadWrite, - ), + pre: sql::helpers::set_transaction(isolation_level, sql::ast::transaction::Mode::ReadWrite), query: Mutations(mutations), - post: sql::helpers::commit(), + post: vec![], } } diff --git a/crates/query-engine/sql/src/sql/helpers.rs b/crates/query-engine/sql/src/sql/helpers.rs index 117b35e7..fde65379 100644 --- a/crates/query-engine/sql/src/sql/helpers.rs +++ b/crates/query-engine/sql/src/sql/helpers.rs @@ -619,25 +619,20 @@ pub const VARIABLES_OBJECT_PLACEHOLDER: &str = "%VARIABLES_OBJECT_PLACEHOLDER"; /// SQL field name to be used for ordering results with multiple variable sets. pub const VARIABLE_ORDER_FIELD: &str = "%variable_order"; -pub fn begin( +pub fn set_transaction( isolation_level: &transaction::IsolationLevel, - transaction_mode: transaction::TransactionMode, + transaction_mode: transaction::Mode, ) -> Vec { - vec![{ - let mut sql = string::SQL::new(); - transaction::Begin { - isolation_level: isolation_level.clone(), - transaction_mode, - } - .to_sql(&mut sql); - string::Statement(sql) - }] -} - -pub fn commit() -> Vec { - vec![{ - let mut sql = string::SQL::new(); - transaction::Commit {}.to_sql(&mut sql); - string::Statement(sql) - }] + vec![ + { + let mut sql = string::SQL::new(); + transaction::SetTransaction::IsolationLevel(isolation_level.clone()).to_sql(&mut sql); + string::Statement(sql) + }, + { + let mut sql = string::SQL::new(); + transaction::SetTransaction::Mode(transaction_mode).to_sql(&mut sql); + string::Statement(sql) + }, + ] } diff --git a/crates/query-engine/translation/tests/snapshots/tests__mutations__simple.snap b/crates/query-engine/translation/tests/snapshots/tests__mutations__simple.snap index eafcb5ce..595adc5f 100644 --- a/crates/query-engine/translation/tests/snapshots/tests__mutations__simple.snap +++ b/crates/query-engine/translation/tests/snapshots/tests__mutations__simple.snap @@ -2,8 +2,11 @@ source: crates/query-engine/translation/tests/tests.rs expression: result --- -BEGIN -ISOLATION LEVEL READ COMMITTED READ WRITE; +SET + TRANSACTION ISOLATION LEVEL READ COMMITTED; + +SET + TRANSACTION READ WRITE; WITH "%0_NATIVE_QUERY_delete_playlist_track" AS ( DELETE FROM @@ -42,6 +45,4 @@ FROM ) AS "%4_aggregates" ) AS "%2_universe"; -COMMIT; - [[]] diff --git a/crates/query-engine/translation/tests/snapshots/tests__transaction__select_with_limit.snap b/crates/query-engine/translation/tests/snapshots/tests__transaction__select_with_limit.snap index ed8a9ed5..6dee65b3 100644 --- a/crates/query-engine/translation/tests/snapshots/tests__transaction__select_with_limit.snap +++ b/crates/query-engine/translation/tests/snapshots/tests__transaction__select_with_limit.snap @@ -2,8 +2,11 @@ source: crates/query-engine/translation/tests/tests.rs expression: result --- -BEGIN -ISOLATION LEVEL SERIALIZABLE READ ONLY; +SET + TRANSACTION ISOLATION LEVEL SERIALIZABLE; + +SET + TRANSACTION READ ONLY; SELECT coalesce(json_agg(row_to_json("%1_universe")), '[]') AS "universe" @@ -25,8 +28,6 @@ FROM 5 OFFSET 3 ) AS "%2_rows" ) AS "%2_rows" - ) AS "%1_universe"; - -COMMIT + ) AS "%1_universe" []