From a2492dde3fedb1fac6a5b23c38ad0100874f69b3 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 8 Jan 2025 14:42:48 +0100 Subject: [PATCH] do not instantiate raw CallbackRequest (#45482) Signed-off-by: Maciej Obuchowski --- .../celery/executors/test_celery_kubernetes_executor.py | 9 +++++++-- .../executors/test_local_kubernetes_executor.py | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/providers/tests/celery/executors/test_celery_kubernetes_executor.py b/providers/tests/celery/executors/test_celery_kubernetes_executor.py index ce51afea4e8fb..027793c29d527 100644 --- a/providers/tests/celery/executors/test_celery_kubernetes_executor.py +++ b/providers/tests/celery/executors/test_celery_kubernetes_executor.py @@ -21,12 +21,14 @@ import pytest -from airflow.callbacks.callback_requests import CallbackRequest +from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest from airflow.configuration import conf from airflow.providers.celery.executors.celery_executor import CeleryExecutor from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + KUBERNETES_QUEUE = "kubernetes" @@ -258,7 +260,10 @@ def test_send_callback(self): cel_k8s_exec = CeleryKubernetesExecutor(cel_exec, k8s_exec) cel_k8s_exec.callback_sink = mock.MagicMock() - callback = CallbackRequest(full_filepath="fake") + if AIRFLOW_V_3_0_PLUS: + callback = DagCallbackRequest(full_filepath="fake", dag_id="fake", run_id="fake") + else: + callback = CallbackRequest(full_filepath="fake") cel_k8s_exec.send_callback(callback) cel_k8s_exec.callback_sink.send.assert_called_once_with(callback) diff --git a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py index 1a745ac139315..be4d8b936739e 100644 --- a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py @@ -19,13 +19,15 @@ from unittest import mock -from airflow.callbacks.callback_requests import CallbackRequest +from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest from airflow.configuration import conf from airflow.executors.local_executor import LocalExecutor from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import ( LocalKubernetesExecutor, ) +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + class TestLocalKubernetesExecutor: def test_supports_pickling(self): @@ -113,7 +115,10 @@ def test_send_callback(self): local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) local_k8s_exec.callback_sink = mock.MagicMock() - callback = CallbackRequest(full_filepath="fake") + if AIRFLOW_V_3_0_PLUS: + callback = DagCallbackRequest(full_filepath="fake", dag_id="fake", run_id="fake") + else: + callback = CallbackRequest(full_filepath="fake") local_k8s_exec.send_callback(callback) local_k8s_exec.callback_sink.send.assert_called_once_with(callback)