From 6c4010d0734aa4237ce8dcfa1cddbc07700d75f2 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Tue, 25 Jul 2023 14:44:23 +0000 Subject: [PATCH] Refactor SqlAlchemy session.execute() calls to 2.0 style in case of plain text SQL queries --- airflow/utils/db.py | 5 ++--- tests/utils/test_db_cleanup.py | 3 ++- tests/utils/test_sqlalchemy.py | 13 +++++++------ 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 9d82114c562b3..1a73d65db80fd 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1222,9 +1222,8 @@ def _create_table_as( ) else: # Postgres and SQLite both support the same "CREATE TABLE a AS SELECT ..." syntax - session.execute( - f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}" - ) + select_table = source_query.selectable.compile(bind=session.get_bind()) + session.execute(text(f"CREATE TABLE {target_table_name} AS {select_table}")) def _move_dangling_data_to_new_table( diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 80419cb9437a3..570a8711bfcd5 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -28,6 +28,7 @@ import pendulum import pytest from pytest import param +from sqlalchemy import text from sqlalchemy.exc import OperationalError from sqlalchemy.ext.declarative import DeclarativeMeta @@ -211,7 +212,7 @@ def test__build_query(self, table_name, date_add_kwargs, expected_to_delete, ext ) stmt = CreateTableAs(target_table_name, query.selectable) session.execute(stmt) - res = session.execute(f"SELECT COUNT(1) FROM {target_table_name}") + res = session.execute(text(f"SELECT COUNT(1) FROM {target_table_name}")) for row in res: assert row[0] == expected_to_delete diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py index 2bf4ad1be5bbd..fdc6fb309d801 100644 --- a/tests/utils/test_sqlalchemy.py +++ b/tests/utils/test_sqlalchemy.py @@ -26,6 +26,7 @@ import pytest from kubernetes.client import models as k8s from pytest import param +from sqlalchemy import text from sqlalchemy.exc import StatementError from airflow import settings @@ -54,7 +55,7 @@ def setup_method(self): # make sure NOT to run in UTC. Only postgres supports storing # timezone information in the datetime field if session.bind.dialect.name == "postgresql": - session.execute("SET timezone='Europe/Amsterdam'") + session.execute(text("SET timezone='Europe/Amsterdam'")) self.session = session @@ -208,17 +209,17 @@ def test_with_row_locks( def test_prohibit_commit(self): with prohibit_commit(self.session) as guard: - self.session.execute("SELECT 1") + self.session.execute(text("SELECT 1")) with pytest.raises(RuntimeError): self.session.commit() self.session.rollback() - self.session.execute("SELECT 1") + self.session.execute(text("SELECT 1")) guard.commit() # Check the expected_commit is reset with pytest.raises(RuntimeError): - self.session.execute("SELECT 1") + self.session.execute(text("SELECT 1")) self.session.commit() def test_prohibit_commit_specific_session_only(self): @@ -233,12 +234,12 @@ def test_prohibit_commit_specific_session_only(self): assert other_session is not self.session with prohibit_commit(self.session): - self.session.execute("SELECT 1") + self.session.execute(text("SELECT 1")) with pytest.raises(RuntimeError): self.session.commit() self.session.rollback() - other_session.execute("SELECT 1") + other_session.execute(text("SELECT 1")) other_session.commit() def teardown_method(self):