Skip to content

Commit

Permalink
Fix _get_compatible_parse_log_streams
Browse files Browse the repository at this point in the history
- Should handle input list is empty.
  • Loading branch information
jason810496 committed Jan 8, 2025
1 parent 65df8c5 commit bf542f4
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,25 +266,32 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
return val


def _get_compatible_parse_log_stream(remote_logs: list[str]) -> _ParsedLogStreamType:
def _get_compatible_parse_log_streams(remote_logs: list[str]) -> list[_ParsedLogStreamType]:
"""
Compatible utility for new log reading(stream-based + k-way merge log) and old log reading(read whole log in memory + sorting).
Turn old log reading into new stream-based log reading.
Will be removed after all providers adapt to stream-based log reading.
:param remote_logs: list of log lines
:return: parsed log stream
:return: parsed log streams if remote_logs is not empty, otherwise empty list
"""
timestamp = None
next_timestamp = None
for line_num, line in enumerate(remote_logs):
with suppress(Exception):
# next_timestamp unchanged if line can't be parsed
next_timestamp = _parse_timestamp(line)
if next_timestamp:
timestamp = next_timestamp
yield timestamp, line_num, line
if not remote_logs:
# empty remote logs
return []

def _parse_log(logs: list[str]):
timestamp = None
next_timestamp = None
for line_num, line in enumerate(logs):
with suppress(Exception):
# next_timestamp unchanged if line can't be parsed
next_timestamp = _parse_timestamp(line)
if next_timestamp:
timestamp = next_timestamp
yield timestamp, line_num, line

return [_parse_log(remote_logs)]


def _get_compatible_read_for_providers(read_response: tuple) -> tuple[Iterable[str], dict[str, Any]]:
Expand Down Expand Up @@ -525,7 +532,7 @@ def _read(
# old log reading
remote_messages, remote_logs = remote_log_result
remote_logs_size = sum(len(log) for log in remote_logs)
remote_parsed_logs = [_get_compatible_parse_log_stream(remote_logs)]
remote_parsed_logs = _get_compatible_parse_log_streams(remote_logs)
elif len(remote_log_result) == 3:
# new stream-based log reading
remote_messages, remote_parsed_logs, remote_logs_size = remote_log_result
Expand All @@ -539,7 +546,7 @@ def _read(
if response and len(response) == 2:
executor_messages, executor_logs = response
executor_logs_size = sum(len(log) for log in executor_logs)
executor_parsed_logs = [_get_compatible_parse_log_stream(executor_logs)]
executor_parsed_logs = _get_compatible_parse_log_streams(executor_logs)
elif response and len(response) == 3:
executor_messages, executor_parsed_logs, executor_logs_size = response
else:
Expand Down Expand Up @@ -567,7 +574,6 @@ def _read(
)
messages_list.extend(served_messages)

print("messages_list", messages_list)
# Log message source details are grouped: they are not relevant for most users and can
# distract them from finding the root cause of their errors
messages = " INFO - ::group::Log message source details\n"
Expand Down

0 comments on commit bf542f4

Please sign in to comment.