From d108d45b0902a18d74a43e8b496a214a5ba7e43e Mon Sep 17 00:00:00 2001 From: hprassad Date: Wed, 1 Jan 2025 00:12:11 +0530 Subject: [PATCH] Renamed fail_stop DAG property to fail_fast (#45229) --- airflow/exceptions.py | 12 ++++++------ airflow/models/dag.py | 6 +++--- airflow/models/taskinstance.py | 20 ++++++++++---------- task_sdk/src/airflow/sdk/definitions/dag.py | 12 ++++++------ tests/models/test_dag.py | 18 +++++++++--------- tests/models/test_taskinstance.py | 6 +++--- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 5e32c00c7d5da..1f88bf817d0a8 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -293,23 +293,23 @@ def __init__(self, *args, **kwargs): warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2) -class FailStopDagInvalidTriggerRule(AirflowException): - """Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule.""" +class FailFastDagInvalidTriggerRule(AirflowException): + """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule.""" _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) @classmethod - def check(cls, *, fail_stop: bool, trigger_rule: TriggerRule): + def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule): """ - Check that fail_stop dag tasks have allowable trigger rules. + Check that fail_fast dag tasks have allowable trigger rules. :meta private: """ - if fail_stop and trigger_rule not in cls._allowed_rules: + if fail_fast and trigger_rule not in cls._allowed_rules: raise cls() def __str__(self) -> str: - return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" + return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" class DuplicateTaskIdFound(AirflowException): diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 4abf24af52537..2632f36bb15e8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -418,9 +418,9 @@ class DAG(TaskSDKDag, LoggingMixin): Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. e.g: {"dag_owner": "https://airflow.apache.org/"} :param auto_register: Automatically register this DAG when it is used in a ``with`` block - :param fail_stop: Fails currently running tasks when task in DAG fails. - **Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success"). - An exception will be thrown if any task in a fail stop dag has a non default trigger rule. + :param fail_fast: Fails currently running tasks when task in DAG fails. + **Warning**: A fail fast dag can only have tasks with the default trigger rule ("all_success"). + An exception will be thrown if any task in a fail fast dag has a non default trigger rule. :param dag_display_name: The display name of the DAG which appears on the UI. """ diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index a5e50cb0d2cdb..647c27f3a3833 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -408,10 +408,10 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, session: Session): task = task_instance.task.dag.task_dict[ti.task_id] if not task.is_teardown: if ti.state == TaskInstanceState.RUNNING: - log.info("Forcing task %s to fail due to dag's `fail_stop` setting", ti.task_id) + log.info("Forcing task %s to fail due to dag's `fail_fast` setting", ti.task_id) ti.error(session) else: - log.info("Setting task %s to SKIPPED due to dag's `fail_stop` setting.", ti.task_id) + log.info("Setting task %s to SKIPPED due to dag's `fail_fast` setting.", ti.task_id) ti.set_state(state=TaskInstanceState.SKIPPED, session=session) else: log.info("Not skipping teardown task '%s'", ti.task_id) @@ -1082,7 +1082,7 @@ def _handle_failure( test_mode: bool | None = None, context: Context | None = None, force_fail: bool = False, - fail_stop: bool = False, + fail_fast: bool = False, ) -> None: """ Handle Failure for a task instance. @@ -1105,7 +1105,7 @@ def _handle_failure( context=context, force_fail=force_fail, session=session, - fail_stop=fail_stop, + fail_fast=fail_fast, ) _log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "") @@ -3048,7 +3048,7 @@ def fetch_handle_failure_context( force_fail: bool = False, *, session: Session, - fail_stop: bool = False, + fail_fast: bool = False, ): """ Fetch the context needed to handle a failure. @@ -3059,7 +3059,7 @@ def fetch_handle_failure_context( :param context: Jinja2 context :param force_fail: if True, task does not retry :param session: SQLAlchemy ORM Session - :param fail_stop: if True, fail all downstream tasks + :param fail_fast: if True, fail all downstream tasks """ if error: if isinstance(error, BaseException): @@ -3116,7 +3116,7 @@ def fetch_handle_failure_context( email_for_state = operator.attrgetter("email_on_failure") callbacks = task.on_failure_callback if task else None - if task and fail_stop: + if task and fail_fast: _stop_remaining_tasks(task_instance=ti, session=session) else: if ti.state == TaskInstanceState.RUNNING: @@ -3173,9 +3173,9 @@ def handle_failure( assert self.task assert self.task.dag try: - fail_stop = self.task.dag.fail_stop + fail_fast = self.task.dag.fail_fast except Exception: - fail_stop = False + fail_fast = False _handle_failure( task_instance=self, error=error, @@ -3183,7 +3183,7 @@ def handle_failure( test_mode=test_mode, context=context, force_fail=force_fail, - fail_stop=fail_stop, + fail_fast=fail_fast, ) def is_eligible_to_retry(self): diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py b/task_sdk/src/airflow/sdk/definitions/dag.py index f49e42aa0eb30..c3bffe8fc8fba 100644 --- a/task_sdk/src/airflow/sdk/definitions/dag.py +++ b/task_sdk/src/airflow/sdk/definitions/dag.py @@ -47,7 +47,7 @@ from airflow import settings from airflow.exceptions import ( DuplicateTaskIdFound, - FailStopDagInvalidTriggerRule, + FailFastDagInvalidTriggerRule, ParamValidationError, TaskNotFound, ) @@ -346,7 +346,7 @@ class DAG: Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. e.g: {"dag_owner": "https://airflow.apache.org/"} :param auto_register: Automatically register this DAG when it is used in a ``with`` block - :param fail_stop: Fails currently running tasks when task in DAG fails. + :param fail_fast: Fails currently running tasks when task in DAG fails. **Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success"). An exception will be thrown if any task in a fail stop dag has a non default trigger rule. :param dag_display_name: The display name of the DAG which appears on the UI. @@ -413,7 +413,7 @@ class DAG: tags: MutableSet[str] = attrs.field(factory=set, converter=_convert_tags) owner_links: dict[str, str] = attrs.field(factory=dict) auto_register: bool = attrs.field(default=True, converter=bool) - fail_stop: bool = attrs.field(default=False, converter=bool) + fail_fast: bool = attrs.field(default=False, converter=bool) dag_display_name: str = attrs.field( default=attrs.Factory(_default_dag_display_name, takes_self=True), validator=attrs.validators.instance_of(str), @@ -928,7 +928,7 @@ def add_task(self, task: Operator) -> None: # Add task_id to used_group_ids to prevent group_id and task_id collisions. self.task_group.used_group_ids.add(task_id) - FailStopDagInvalidTriggerRule.check(fail_stop=self.fail_stop, trigger_rule=task.trigger_rule) + FailFastDagInvalidTriggerRule.check(fail_fast=self.fail_fast, trigger_rule=task.trigger_rule) def add_tasks(self, tasks: Iterable[Operator]) -> None: """ @@ -1022,7 +1022,7 @@ def _validate_owner_links(self, _, owner_links): "has_on_success_callback", "has_on_failure_callback", "auto_register", - "fail_stop", + "fail_fast", "schedule", } @@ -1058,7 +1058,7 @@ def dag( tags: Collection[str] | None = None, owner_links: dict[str, str] | None = None, auto_register: bool = True, - fail_stop: bool = False, + fail_fast: bool = False, dag_display_name: str | None = None, ) -> Callable[[Callable], Callable[..., DAG]]: """ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 53090f6225bc8..3757262bc864b 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1428,40 +1428,40 @@ def test_create_dagrun_job_id_is_set(self): def test_dag_add_task_checks_trigger_rule(self): # A non fail stop dag should allow any trigger rule - from airflow.exceptions import FailStopDagInvalidTriggerRule + from airflow.exceptions import FailFastDagInvalidTriggerRule from airflow.utils.trigger_rule import TriggerRule task_with_non_default_trigger_rule = EmptyOperator( task_id="task_with_non_default_trigger_rule", trigger_rule=TriggerRule.ALWAYS ) - non_fail_stop_dag = DAG( + non_fail_fast_dag = DAG( dag_id="test_dag_add_task_checks_trigger_rule", schedule=None, start_date=DEFAULT_DATE, - fail_stop=False, + fail_fast=False, ) - non_fail_stop_dag.add_task(task_with_non_default_trigger_rule) + non_fail_fast_dag.add_task(task_with_non_default_trigger_rule) # a fail stop dag should allow default trigger rule from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE - fail_stop_dag = DAG( + fail_fast_dag = DAG( dag_id="test_dag_add_task_checks_trigger_rule", schedule=None, start_date=DEFAULT_DATE, - fail_stop=True, + fail_fast=True, ) task_with_default_trigger_rule = EmptyOperator( task_id="task_with_default_trigger_rule", trigger_rule=DEFAULT_TRIGGER_RULE ) - fail_stop_dag.add_task(task_with_default_trigger_rule) + fail_fast_dag.add_task(task_with_default_trigger_rule) # a fail stop dag should not allow a non-default trigger rule task_with_non_default_trigger_rule = EmptyOperator( task_id="task_with_non_default_trigger_rule", trigger_rule=TriggerRule.ALWAYS ) - with pytest.raises(FailStopDagInvalidTriggerRule): - fail_stop_dag.add_task(task_with_non_default_trigger_rule) + with pytest.raises(FailFastDagInvalidTriggerRule): + fail_fast_dag.add_task(task_with_non_default_trigger_rule) def test_dag_add_task_sets_default_task_group(self): dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", schedule=None, start_date=DEFAULT_DATE) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 73f5908b707cf..3b6047d0334bf 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -3646,19 +3646,19 @@ def test_handle_failure_task_undefined(self, create_task_instance): ti.handle_failure("test ti.task undefined") @provide_session - def test_handle_failure_fail_stop(self, create_dummy_dag, session=None): + def test_handle_failure_fail_fast(self, create_dummy_dag, session=None): start_date = timezone.datetime(2016, 6, 1) clear_db_runs() dag, task1 = create_dummy_dag( - dag_id="test_handle_failure_fail_stop", + dag_id="test_handle_failure_fail_fast", schedule=None, start_date=start_date, task_id="task1", trigger_rule="all_success", with_dagrun_type=DagRunType.MANUAL, session=session, - fail_stop=True, + fail_fast=True, ) logical_date = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}