Skip to content

Commit

Permalink
Fix test_log_handler
Browse files Browse the repository at this point in the history
- test__read_for_celery_executor_fallbacks_to_worker
- logical_date -> execution_date
- test_interleave_logs_correct_ordering
  • Loading branch information
jason810496 committed Jan 23, 2025
1 parent bca4a71 commit cc217a5
Showing 1 changed file with 3 additions and 55 deletions.
58 changes: 3 additions & 55 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
from tests.test_utils.config import conf_vars
from tests.test_utils.file_task_handler import (
log_str_to_parsed_log_stream,
mark_test_for_old_read_log_method,
mark_test_for_stream_based_read_log_method,
)

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
Expand Down Expand Up @@ -284,6 +282,7 @@ def test__read_when_local(self, mock_read_local, create_task_instance):
dag_id="dag_for_testing_local_log_read",
task_id="task_for_testing_local_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
fth = FileTaskHandler("")
log_stream, metadata_array = fth._read(ti=local_log_file_read, try_number=1)
Expand Down Expand Up @@ -371,58 +370,6 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc
else:
mock_k8s_get_task_log.assert_not_called()

@mark_test_for_old_read_log_method
def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker"""
executor_name = "CeleryExecutor"
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
)
ti.state = TaskInstanceState.RUNNING
ti.try_number = 1
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()
assert "*** this message\n" in actual[0]
assert actual[0].endswith("this\nlog\ncontent")
assert actual[1] == {"end_of_log": False, "log_pos": 16}

@mark_test_for_stream_based_read_log_method
def test_stream_based__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker"""
executor_name = "CeleryExecutor"
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
)
ti.state = TaskInstanceState.RUNNING
ti.try_number = 1
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")

fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = (
["this message"],
[log_str_to_parsed_log_stream("this\nlog\ncontent")],
len("this\nlog\ncontent"),
)
log_stream, metadata = fth._read(ti=ti, try_number=1)
log_str = "\n".join(line for line in log_stream)
fth._read_from_logs_server.assert_called_once()
assert "*** this message\n" in log_str
assert log_str.endswith("this\nlog\ncontent")
assert metadata == {"end_of_log": False, "log_pos": 16}

@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
[
Expand Down Expand Up @@ -450,6 +397,7 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.SUCCESS # we're testing scenario when task is done
with conf_vars({("core", "executor"): executor_name}):
Expand Down Expand Up @@ -853,7 +801,7 @@ def test_interleave_logs_correct_ordering():
"""

parsed_log_streams = [
log_str_to_parsed_log_stream(long_sample),
log_str_to_parsed_log_stream(sample_with_dupe),
log_str_to_parsed_log_stream(sample_with_dupe),
]
assert sample_with_dupe == "\n".join(_interleave_logs(*parsed_log_streams))
Expand Down

0 comments on commit cc217a5

Please sign in to comment.