Skip to content

Commit

Permalink
Use the transaction methods for committing, and implicitly roll back.
Browse files Browse the repository at this point in the history
Seems a lot safer than implementing this machinery ourselves.
  • Loading branch information
SamirTalwar committed Jan 15, 2024
1 parent cd4247e commit e27d799
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 83 deletions.
5 changes: 4 additions & 1 deletion crates/query-engine/execution/src/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub async fn execute(

let query_timer = metrics.time_query_execution();
let rows_result = execute_mutations(transaction.as_mut(), database_info, plan).await;
query_timer.complete_with(rows_result)
let rows = query_timer.complete_with(rows_result)?;

transaction.commit().await?;
Ok(rows)
}

/// Run mutations, returning a result for each.
Expand Down
41 changes: 19 additions & 22 deletions crates/query-engine/execution/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use query_engine_sql::sql::execution_plan::{ExecutionPlan, Query};
use serde_json;
use sqlformat;
use sqlx;
use sqlx::pool::PoolConnection;
use sqlx::{Postgres, Row};
use sqlx::{self, Row};
use tracing::{info_span, Instrument};

use crate::database_info::DatabaseInfo;
Expand All @@ -23,19 +21,19 @@ pub async fn execute(
plan: ExecutionPlan<Query>,
) -> Result<Bytes, Error> {
let acquisition_timer = metrics.time_connection_acquisition_wait();
let connection_result = pool
.acquire()
let transaction_result = pool
.begin()
.instrument(info_span!("Acquire connection"))
.await;
let mut connection = acquisition_timer
.complete_with(connection_result)
let mut transaction = acquisition_timer
.complete_with(transaction_result)
.map_err(|err| {
metrics.error_metrics.record_connection_acquisition_error();
err
})?;

let query_timer = metrics.time_query_execution();
let rows_result = execute_query(&mut connection, database_info, plan).await;
let rows_result = execute_query(&mut transaction, database_info, plan).await;

query_timer.complete_with(rows_result)
}
Expand Down Expand Up @@ -68,20 +66,19 @@ pub async fn explain(
// Otherwise, we proceed as usual.
else {
let acquisition_timer = metrics.time_connection_acquisition_wait();
let connection_result = pool
.acquire()
let transaction_result = pool
.begin()
.instrument(info_span!("Acquire connection"))
.await;
let mut connection =
acquisition_timer
.complete_with(connection_result)
.map_err(|err| {
metrics.error_metrics.record_connection_acquisition_error();
err
})?;
let mut transaction = acquisition_timer
.complete_with(transaction_result)
.map_err(|err| {
metrics.error_metrics.record_connection_acquisition_error();
err
})?;

for statement in plan.pre {
execute_statement(&mut connection, &statement).await?;
execute_statement(&mut transaction, &statement).await?;
}

tracing::info!(
Expand All @@ -97,7 +94,7 @@ pub async fn explain(
let rows: Vec<sqlx::postgres::PgRow> = {
// run and fetch from the database
sqlx_query
.fetch_all(connection.as_mut())
.fetch_all(transaction.as_mut())
.instrument(info_span!(
"Database request",
internal.visibility = "user",
Expand All @@ -123,7 +120,7 @@ pub async fn explain(
}

for statement in plan.post {
execute_statement(&mut connection, &statement).await?
execute_statement(&mut transaction, &statement).await?
}
Ok::<String, Error>(results.join("\n"))
}
Expand All @@ -140,7 +137,7 @@ pub async fn explain(

/// Execute the query and return the result as bytes.
async fn execute_query(
connection: &mut PoolConnection<Postgres>,
connection: &mut sqlx::PgConnection,
database_info: &DatabaseInfo,
plan: ExecutionPlan<Query>,
) -> Result<Bytes, Error> {
Expand Down Expand Up @@ -260,7 +257,7 @@ fn variables_to_json(

/// Execute a sql statement against the database.
async fn execute_statement(
connection: &mut PoolConnection<Postgres>,
connection: &mut sqlx::PgConnection,
sql::string::Statement(statement): &sql::string::Statement,
) -> Result<(), Error> {
tracing::info!(
Expand Down
13 changes: 5 additions & 8 deletions crates/query-engine/sql/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,12 @@ pub struct ColumnAlias {

/// Transactions manipulation
pub mod transaction {
/// Begin a transaction
pub struct Begin {
pub isolation_level: IsolationLevel,
pub transaction_mode: TransactionMode,
/// Set properties for a transaction
pub enum SetTransaction {
IsolationLevel(IsolationLevel),
Mode(Mode),
}

/// Commit a transaction
pub struct Commit {}

#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize, schemars::JsonSchema)]
/// The isolation level of transactions
pub enum IsolationLevel {
Expand All @@ -353,7 +350,7 @@ pub mod transaction {
}

/// The transaction mode of transactions
pub enum TransactionMode {
pub enum Mode {
ReadWrite,
ReadOnly,
}
Expand Down
25 changes: 11 additions & 14 deletions crates/query-engine/sql/src/sql/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,15 @@ impl OrderByDirection {
}
}

impl transaction::Begin {
impl transaction::SetTransaction {
pub fn to_sql(&self, sql: &mut SQL) {
sql.append_syntax("BEGIN ");
self.isolation_level.to_sql(sql);
sql.append_syntax(" ");
self.transaction_mode.to_sql(sql);
sql.append_syntax("SET TRANSACTION ");
match self {
transaction::SetTransaction::IsolationLevel(isolation_level) => {
isolation_level.to_sql(sql)
}
transaction::SetTransaction::Mode(mode) => mode.to_sql(sql),
}
}
}

Expand All @@ -574,17 +577,11 @@ impl transaction::IsolationLevel {
}
}

impl transaction::TransactionMode {
impl transaction::Mode {
pub fn to_sql(&self, sql: &mut SQL) {
match self {
transaction::TransactionMode::ReadWrite => sql.append_syntax("READ WRITE"),
transaction::TransactionMode::ReadOnly => sql.append_syntax("READ ONLY"),
transaction::Mode::ReadWrite => sql.append_syntax("READ WRITE"),
transaction::Mode::ReadOnly => sql.append_syntax("READ ONLY"),
}
}
}

impl transaction::Commit {
pub fn to_sql(&self, sql: &mut SQL) {
sql.append_syntax("COMMIT");
}
}
14 changes: 4 additions & 10 deletions crates/query-engine/sql/src/sql/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,13 @@ pub fn simple_query_execution_plan(
query: sql::ast::Select,
) -> ExecutionPlan<Query> {
ExecutionPlan {
pre: sql::helpers::begin(
isolation_level,
sql::ast::transaction::TransactionMode::ReadOnly,
),
pre: sql::helpers::set_transaction(isolation_level, sql::ast::transaction::Mode::ReadOnly),
query: Query {
variables,
root_field,
query,
},
post: sql::helpers::commit(),
post: vec![],
}
}

Expand Down Expand Up @@ -98,11 +95,8 @@ pub fn simple_mutations_execution_plan(
mutations: Vec<Mutation>,
) -> ExecutionPlan<Mutations> {
ExecutionPlan {
pre: sql::helpers::begin(
isolation_level,
sql::ast::transaction::TransactionMode::ReadWrite,
),
pre: sql::helpers::set_transaction(isolation_level, sql::ast::transaction::Mode::ReadWrite),
query: Mutations(mutations),
post: sql::helpers::commit(),
post: vec![],
}
}
33 changes: 14 additions & 19 deletions crates/query-engine/sql/src/sql/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,25 +619,20 @@ pub const VARIABLES_OBJECT_PLACEHOLDER: &str = "%VARIABLES_OBJECT_PLACEHOLDER";
/// SQL field name to be used for ordering results with multiple variable sets.
pub const VARIABLE_ORDER_FIELD: &str = "%variable_order";

pub fn begin(
pub fn set_transaction(
isolation_level: &transaction::IsolationLevel,
transaction_mode: transaction::TransactionMode,
transaction_mode: transaction::Mode,
) -> Vec<string::Statement> {
vec![{
let mut sql = string::SQL::new();
transaction::Begin {
isolation_level: isolation_level.clone(),
transaction_mode,
}
.to_sql(&mut sql);
string::Statement(sql)
}]
}

pub fn commit() -> Vec<string::Statement> {
vec![{
let mut sql = string::SQL::new();
transaction::Commit {}.to_sql(&mut sql);
string::Statement(sql)
}]
vec![
{
let mut sql = string::SQL::new();
transaction::SetTransaction::IsolationLevel(isolation_level.clone()).to_sql(&mut sql);
string::Statement(sql)
},
{
let mut sql = string::SQL::new();
transaction::SetTransaction::Mode(transaction_mode).to_sql(&mut sql);
string::Statement(sql)
},
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL READ COMMITTED READ WRITE;
SET
TRANSACTION ISOLATION LEVEL READ COMMITTED;

SET
TRANSACTION READ WRITE;

WITH "%0_NATIVE_QUERY_delete_playlist_track" AS (
DELETE FROM
Expand Down Expand Up @@ -42,6 +45,4 @@ FROM
) AS "%4_aggregates"
) AS "%2_universe";

COMMIT;

[[]]
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
source: crates/query-engine/translation/tests/tests.rs
expression: result
---
BEGIN
ISOLATION LEVEL SERIALIZABLE READ ONLY;
SET
TRANSACTION ISOLATION LEVEL SERIALIZABLE;

SET
TRANSACTION READ ONLY;

SELECT
coalesce(json_agg(row_to_json("%1_universe")), '[]') AS "universe"
Expand All @@ -25,8 +28,6 @@ FROM
5 OFFSET 3
) AS "%2_rows"
) AS "%2_rows"
) AS "%1_universe";

COMMIT
) AS "%1_universe"

[]

0 comments on commit e27d799

Please sign in to comment.