Skip to content

Commit

Permalink
Renamed fail_stop DAG property to fail_fast (#45229)
Browse files Browse the repository at this point in the history
  • Loading branch information
hprassad authored and potiuk committed Jan 2, 2025
1 parent f7da5e4 commit d108d45
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 37 deletions.
12 changes: 6 additions & 6 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,23 +293,23 @@ def __init__(self, *args, **kwargs):
warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2)


class FailStopDagInvalidTriggerRule(AirflowException):
"""Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule."""
class FailFastDagInvalidTriggerRule(AirflowException):
"""Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule."""

_allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)

@classmethod
def check(cls, *, fail_stop: bool, trigger_rule: TriggerRule):
def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
"""
Check that fail_stop dag tasks have allowable trigger rules.
Check that fail_fast dag tasks have allowable trigger rules.
:meta private:
"""
if fail_stop and trigger_rule not in cls._allowed_rules:
if fail_fast and trigger_rule not in cls._allowed_rules:
raise cls()

def __str__(self) -> str:
return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"


class DuplicateTaskIdFound(AirflowException):
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ class DAG(TaskSDKDag, LoggingMixin):
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
:param fail_stop: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
:param fail_fast: Fails currently running tasks when task in DAG fails.
**Warning**: A fail fast dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail fast dag has a non default trigger rule.
:param dag_display_name: The display name of the DAG which appears on the UI.
"""

Expand Down
20 changes: 10 additions & 10 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,10 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, session: Session):
task = task_instance.task.dag.task_dict[ti.task_id]
if not task.is_teardown:
if ti.state == TaskInstanceState.RUNNING:
log.info("Forcing task %s to fail due to dag's `fail_stop` setting", ti.task_id)
log.info("Forcing task %s to fail due to dag's `fail_fast` setting", ti.task_id)
ti.error(session)
else:
log.info("Setting task %s to SKIPPED due to dag's `fail_stop` setting.", ti.task_id)
log.info("Setting task %s to SKIPPED due to dag's `fail_fast` setting.", ti.task_id)
ti.set_state(state=TaskInstanceState.SKIPPED, session=session)
else:
log.info("Not skipping teardown task '%s'", ti.task_id)
Expand Down Expand Up @@ -1082,7 +1082,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
fail_fast: bool = False,
) -> None:
"""
Handle Failure for a task instance.
Expand All @@ -1105,7 +1105,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
fail_stop=fail_stop,
fail_fast=fail_fast,
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
Expand Down Expand Up @@ -3048,7 +3048,7 @@ def fetch_handle_failure_context(
force_fail: bool = False,
*,
session: Session,
fail_stop: bool = False,
fail_fast: bool = False,
):
"""
Fetch the context needed to handle a failure.
Expand All @@ -3059,7 +3059,7 @@ def fetch_handle_failure_context(
:param context: Jinja2 context
:param force_fail: if True, task does not retry
:param session: SQLAlchemy ORM Session
:param fail_stop: if True, fail all downstream tasks
:param fail_fast: if True, fail all downstream tasks
"""
if error:
if isinstance(error, BaseException):
Expand Down Expand Up @@ -3116,7 +3116,7 @@ def fetch_handle_failure_context(
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and fail_stop:
if task and fail_fast:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.RUNNING:
Expand Down Expand Up @@ -3173,17 +3173,17 @@ def handle_failure(
assert self.task
assert self.task.dag
try:
fail_stop = self.task.dag.fail_stop
fail_fast = self.task.dag.fail_fast
except Exception:
fail_stop = False
fail_fast = False
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=fail_stop,
fail_fast=fail_fast,
)

def is_eligible_to_retry(self):
Expand Down
12 changes: 6 additions & 6 deletions task_sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from airflow import settings
from airflow.exceptions import (
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
FailFastDagInvalidTriggerRule,
ParamValidationError,
TaskNotFound,
)
Expand Down Expand Up @@ -346,7 +346,7 @@ class DAG:
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
:param fail_stop: Fails currently running tasks when task in DAG fails.
:param fail_fast: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
:param dag_display_name: The display name of the DAG which appears on the UI.
Expand Down Expand Up @@ -413,7 +413,7 @@ class DAG:
tags: MutableSet[str] = attrs.field(factory=set, converter=_convert_tags)
owner_links: dict[str, str] = attrs.field(factory=dict)
auto_register: bool = attrs.field(default=True, converter=bool)
fail_stop: bool = attrs.field(default=False, converter=bool)
fail_fast: bool = attrs.field(default=False, converter=bool)
dag_display_name: str = attrs.field(
default=attrs.Factory(_default_dag_display_name, takes_self=True),
validator=attrs.validators.instance_of(str),
Expand Down Expand Up @@ -928,7 +928,7 @@ def add_task(self, task: Operator) -> None:
# Add task_id to used_group_ids to prevent group_id and task_id collisions.
self.task_group.used_group_ids.add(task_id)

FailStopDagInvalidTriggerRule.check(fail_stop=self.fail_stop, trigger_rule=task.trigger_rule)
FailFastDagInvalidTriggerRule.check(fail_fast=self.fail_fast, trigger_rule=task.trigger_rule)

def add_tasks(self, tasks: Iterable[Operator]) -> None:
"""
Expand Down Expand Up @@ -1022,7 +1022,7 @@ def _validate_owner_links(self, _, owner_links):
"has_on_success_callback",
"has_on_failure_callback",
"auto_register",
"fail_stop",
"fail_fast",
"schedule",
}

Expand Down Expand Up @@ -1058,7 +1058,7 @@ def dag(
tags: Collection[str] | None = None,
owner_links: dict[str, str] | None = None,
auto_register: bool = True,
fail_stop: bool = False,
fail_fast: bool = False,
dag_display_name: str | None = None,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,40 +1428,40 @@ def test_create_dagrun_job_id_is_set(self):

def test_dag_add_task_checks_trigger_rule(self):
# A non fail stop dag should allow any trigger rule
from airflow.exceptions import FailStopDagInvalidTriggerRule
from airflow.exceptions import FailFastDagInvalidTriggerRule
from airflow.utils.trigger_rule import TriggerRule

task_with_non_default_trigger_rule = EmptyOperator(
task_id="task_with_non_default_trigger_rule", trigger_rule=TriggerRule.ALWAYS
)
non_fail_stop_dag = DAG(
non_fail_fast_dag = DAG(
dag_id="test_dag_add_task_checks_trigger_rule",
schedule=None,
start_date=DEFAULT_DATE,
fail_stop=False,
fail_fast=False,
)
non_fail_stop_dag.add_task(task_with_non_default_trigger_rule)
non_fail_fast_dag.add_task(task_with_non_default_trigger_rule)

# a fail stop dag should allow default trigger rule
from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE

fail_stop_dag = DAG(
fail_fast_dag = DAG(
dag_id="test_dag_add_task_checks_trigger_rule",
schedule=None,
start_date=DEFAULT_DATE,
fail_stop=True,
fail_fast=True,
)
task_with_default_trigger_rule = EmptyOperator(
task_id="task_with_default_trigger_rule", trigger_rule=DEFAULT_TRIGGER_RULE
)
fail_stop_dag.add_task(task_with_default_trigger_rule)
fail_fast_dag.add_task(task_with_default_trigger_rule)

# a fail stop dag should not allow a non-default trigger rule
task_with_non_default_trigger_rule = EmptyOperator(
task_id="task_with_non_default_trigger_rule", trigger_rule=TriggerRule.ALWAYS
)
with pytest.raises(FailStopDagInvalidTriggerRule):
fail_stop_dag.add_task(task_with_non_default_trigger_rule)
with pytest.raises(FailFastDagInvalidTriggerRule):
fail_fast_dag.add_task(task_with_non_default_trigger_rule)

def test_dag_add_task_sets_default_task_group(self):
dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", schedule=None, start_date=DEFAULT_DATE)
Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3646,19 +3646,19 @@ def test_handle_failure_task_undefined(self, create_task_instance):
ti.handle_failure("test ti.task undefined")

@provide_session
def test_handle_failure_fail_stop(self, create_dummy_dag, session=None):
def test_handle_failure_fail_fast(self, create_dummy_dag, session=None):
start_date = timezone.datetime(2016, 6, 1)
clear_db_runs()

dag, task1 = create_dummy_dag(
dag_id="test_handle_failure_fail_stop",
dag_id="test_handle_failure_fail_fast",
schedule=None,
start_date=start_date,
task_id="task1",
trigger_rule="all_success",
with_dagrun_type=DagRunType.MANUAL,
session=session,
fail_stop=True,
fail_fast=True,
)
logical_date = timezone.utcnow()
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
Expand Down

0 comments on commit d108d45

Please sign in to comment.