From cc217a530289d7c8edfab235d93206096dafc74c Mon Sep 17 00:00:00 2001 From: jason810496 Date: Thu, 23 Jan 2025 19:31:22 +0800 Subject: [PATCH] Fix test_log_handler - test__read_for_celery_executor_fallbacks_to_worker - logical_date -> execution_date - test_interleave_logs_correct_ordering --- tests/utils/test_log_handlers.py | 58 ++------------------------------ 1 file changed, 3 insertions(+), 55 deletions(-) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 4ef25ffed60b7..ee628341d95b0 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -60,8 +60,6 @@ from tests.test_utils.config import conf_vars from tests.test_utils.file_task_handler import ( log_str_to_parsed_log_stream, - mark_test_for_old_read_log_method, - mark_test_for_stream_based_read_log_method, ) pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -284,6 +282,7 @@ def test__read_when_local(self, mock_read_local, create_task_instance): dag_id="dag_for_testing_local_log_read", task_id="task_for_testing_local_log_read", run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, ) fth = FileTaskHandler("") log_stream, metadata_array = fth._read(ti=local_log_file_read, try_number=1) @@ -371,58 +370,6 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc else: mock_k8s_get_task_log.assert_not_called() - @mark_test_for_old_read_log_method - def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance): - """Test for executors which do not have `get_task_log` method, it fallbacks to reading - log from worker""" - executor_name = "CeleryExecutor" - ti = create_task_instance( - dag_id="dag_for_testing_celery_executor_log_read", - task_id="task_for_testing_celery_executor_log_read", - run_type=DagRunType.SCHEDULED, - ) - ti.state = TaskInstanceState.RUNNING - ti.try_number = 1 - with conf_vars({("core", "executor"): executor_name}): - reload(executor_loader) - fth = FileTaskHandler("") - fth._read_from_logs_server = mock.Mock() - fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] - actual = fth._read(ti=ti, try_number=1) - fth._read_from_logs_server.assert_called_once() - assert "*** this message\n" in actual[0] - assert actual[0].endswith("this\nlog\ncontent") - assert actual[1] == {"end_of_log": False, "log_pos": 16} - - @mark_test_for_stream_based_read_log_method - def test_stream_based__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance): - """Test for executors which do not have `get_task_log` method, it fallbacks to reading - log from worker""" - executor_name = "CeleryExecutor" - ti = create_task_instance( - dag_id="dag_for_testing_celery_executor_log_read", - task_id="task_for_testing_celery_executor_log_read", - run_type=DagRunType.SCHEDULED, - ) - ti.state = TaskInstanceState.RUNNING - ti.try_number = 1 - with conf_vars({("core", "executor"): executor_name}): - reload(executor_loader) - fth = FileTaskHandler("") - - fth._read_from_logs_server = mock.Mock() - fth._read_from_logs_server.return_value = ( - ["this message"], - [log_str_to_parsed_log_stream("this\nlog\ncontent")], - len("this\nlog\ncontent"), - ) - log_stream, metadata = fth._read(ti=ti, try_number=1) - log_str = "\n".join(line for line in log_stream) - fth._read_from_logs_server.assert_called_once() - assert "*** this message\n" in log_str - assert log_str.endswith("this\nlog\ncontent") - assert metadata == {"end_of_log": False, "log_pos": 16} - @pytest.mark.parametrize( "remote_logs, local_logs, served_logs_checked", [ @@ -450,6 +397,7 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( dag_id="dag_for_testing_celery_executor_log_read", task_id="task_for_testing_celery_executor_log_read", run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, ) ti.state = TaskInstanceState.SUCCESS # we're testing scenario when task is done with conf_vars({("core", "executor"): executor_name}): @@ -853,7 +801,7 @@ def test_interleave_logs_correct_ordering(): """ parsed_log_streams = [ - log_str_to_parsed_log_stream(long_sample), + log_str_to_parsed_log_stream(sample_with_dupe), log_str_to_parsed_log_stream(sample_with_dupe), ] assert sample_with_dupe == "\n".join(_interleave_logs(*parsed_log_streams))