Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename fail stop dag property to fail fast #45327

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/news-fragment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ jobs:
BASE_REF: ${{ github.base_ref }}
run: >
change_types=(
'DAG changes'
'Dag changes'
'Config changes'
'API changes'
'CLI changes'
'Behaviour changes'
'Plugin changes'
'Dependency change'
'Dependency changes'
)
news_fragment_content=`git diff origin/${BASE_REF} newsfragments/*.significant.rst`
Expand Down
12 changes: 6 additions & 6 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,23 +289,23 @@ def __init__(self, *args, **kwargs):
warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2)


class FailStopDagInvalidTriggerRule(AirflowException):
"""Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule."""
class FailFastDagInvalidTriggerRule(AirflowException):
"""Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule."""

_allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)

@classmethod
def check(cls, *, fail_stop: bool, trigger_rule: TriggerRule):
def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
"""
Check that fail_stop dag tasks have allowable trigger rules.
Check that fail_fast dag tasks have allowable trigger rules.

:meta private:
"""
if fail_stop and trigger_rule not in cls._allowed_rules:
if fail_fast and trigger_rule not in cls._allowed_rules:
raise cls()

def __str__(self) -> str:
return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"


class DuplicateTaskIdFound(AirflowException):
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ class DAG(TaskSDKDag, LoggingMixin):
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
:param fail_stop: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
:param fail_fast: Fails currently running tasks when task in DAG fails.
**Warning**: A fail fast dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail fast dag has a non default trigger rule.
:param dag_display_name: The display name of the DAG which appears on the UI.
"""

Expand Down
20 changes: 10 additions & 10 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,10 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, session: Session):
task = task_instance.task.dag.task_dict[ti.task_id]
if not task.is_teardown:
if ti.state == TaskInstanceState.RUNNING:
log.info("Forcing task %s to fail due to dag's `fail_stop` setting", ti.task_id)
log.info("Forcing task %s to fail due to dag's `fail_fast` setting", ti.task_id)
ti.error(session)
else:
log.info("Setting task %s to SKIPPED due to dag's `fail_stop` setting.", ti.task_id)
log.info("Setting task %s to SKIPPED due to dag's `fail_fast` setting.", ti.task_id)
ti.set_state(state=TaskInstanceState.SKIPPED, session=session)
else:
log.info("Not skipping teardown task '%s'", ti.task_id)
Expand Down Expand Up @@ -1083,7 +1083,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
fail_fast: bool = False,
) -> None:
"""
Handle Failure for a task instance.
Expand All @@ -1106,7 +1106,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
fail_stop=fail_stop,
fail_fast=fail_fast,
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
Expand Down Expand Up @@ -3065,7 +3065,7 @@ def fetch_handle_failure_context(
force_fail: bool = False,
*,
session: Session,
fail_stop: bool = False,
fail_fast: bool = False,
):
"""
Fetch the context needed to handle a failure.
Expand All @@ -3076,7 +3076,7 @@ def fetch_handle_failure_context(
:param context: Jinja2 context
:param force_fail: if True, task does not retry
:param session: SQLAlchemy ORM Session
:param fail_stop: if True, fail all downstream tasks
:param fail_fast: if True, fail all downstream tasks
"""
if error:
if isinstance(error, BaseException):
Expand Down Expand Up @@ -3133,7 +3133,7 @@ def fetch_handle_failure_context(
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and fail_stop:
if task and fail_fast:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.RUNNING:
Expand Down Expand Up @@ -3190,17 +3190,17 @@ def handle_failure(
assert self.task
assert self.task.dag
try:
fail_stop = self.task.dag.fail_stop
fail_fast = self.task.dag.fail_fast
except Exception:
fail_stop = False
fail_fast = False
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=fail_stop,
fail_fast=fail_fast,
)

def is_eligible_to_retry(self):
Expand Down
23 changes: 23 additions & 0 deletions newsfragments/45327.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Renamed DAG argument ``fail_stop`` to ``fail_fast`` across the codebase to align with Airflow 3.0.


* Types of change

* [x] Dag changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [ ] Code interface changes

* Migration rules needed

* ruff

* AIR302

* arguments in ``DAG``

* [ ] ``fail_stop`` → ``fail_fast``
15 changes: 7 additions & 8 deletions task_sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from airflow import settings
from airflow.exceptions import (
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
FailFastDagInvalidTriggerRule,
ParamValidationError,
TaskNotFound,
)
Expand Down Expand Up @@ -346,7 +346,7 @@ class DAG:
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
:param fail_stop: Fails currently running tasks when task in DAG fails.
:param fail_fast: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
:param dag_display_name: The display name of the DAG which appears on the UI.
Expand Down Expand Up @@ -413,7 +413,7 @@ class DAG:
tags: MutableSet[str] = attrs.field(factory=set, converter=_convert_tags)
owner_links: dict[str, str] = attrs.field(factory=dict)
auto_register: bool = attrs.field(default=True, converter=bool)
fail_stop: bool = attrs.field(default=False, converter=bool)
fail_fast: bool = attrs.field(default=False, converter=bool)
dag_display_name: str = attrs.field(
default=attrs.Factory(_default_dag_display_name, takes_self=True),
validator=attrs.validators.instance_of(str),
Expand Down Expand Up @@ -497,8 +497,7 @@ def _default_timetable(instance: DAG):
elif isinstance(schedule, Collection) and not isinstance(schedule, str):
if not all(isinstance(x, BaseAsset) for x in schedule):
raise ValueError(
"All elements in 'schedule' should be either assets, "
"asset references, or asset aliases"
"All elements in 'schedule' should be either assets, asset references, or asset aliases"
)
return AssetTriggeredTimetable(AssetAll(*schedule))
else:
Expand Down Expand Up @@ -928,7 +927,7 @@ def add_task(self, task: Operator) -> None:
# Add task_id to used_group_ids to prevent group_id and task_id collisions.
self.task_group.used_group_ids.add(task_id)

FailStopDagInvalidTriggerRule.check(fail_stop=self.fail_stop, trigger_rule=task.trigger_rule)
FailFastDagInvalidTriggerRule.check(fail_fast=self.fail_fast, trigger_rule=task.trigger_rule)

def add_tasks(self, tasks: Iterable[Operator]) -> None:
"""
Expand Down Expand Up @@ -1022,7 +1021,7 @@ def _validate_owner_links(self, _, owner_links):
"has_on_success_callback",
"has_on_failure_callback",
"auto_register",
"fail_stop",
"fail_fast",
"schedule",
}

Expand Down Expand Up @@ -1058,7 +1057,7 @@ def dag(
tags: Collection[str] | None = None,
owner_links: dict[str, str] | None = None,
auto_register: bool = True,
fail_stop: bool = False,
fail_fast: bool = False,
dag_display_name: str | None = None,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,24 @@ class TestBaseOperator:
def test_trigger_rule_validation(self):
from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE

fail_stop_dag = DAG(
fail_fast_dag = DAG(
dag_id="test_dag_trigger_rule_validation",
schedule=None,
start_date=DEFAULT_DATE,
fail_stop=True,
fail_fast=True,
)
non_fail_stop_dag = DAG(
non_fail_fast_dag = DAG(
dag_id="test_dag_trigger_rule_validation",
schedule=None,
start_date=DEFAULT_DATE,
fail_stop=False,
fail_fast=False,
)

# An operator with default trigger rule and a fail-stop dag should be allowed
BaseOperator(task_id="test_valid_trigger_rule", dag=fail_stop_dag, trigger_rule=DEFAULT_TRIGGER_RULE)
BaseOperator(task_id="test_valid_trigger_rule", dag=fail_fast_dag, trigger_rule=DEFAULT_TRIGGER_RULE)
# An operator with non default trigger rule and a non fail-stop dag should be allowed
BaseOperator(
task_id="test_valid_trigger_rule", dag=non_fail_stop_dag, trigger_rule=TriggerRule.ALWAYS
task_id="test_valid_trigger_rule", dag=non_fail_fast_dag, trigger_rule=TriggerRule.ALWAYS
)

def test_cross_downstream(self):
Expand Down Expand Up @@ -454,13 +454,13 @@ def get_states(dr):


@pytest.mark.db_test
def test_teardown_and_fail_stop(dag_maker):
def test_teardown_and_fail_fast(dag_maker):
"""
when fail_stop enabled, teardowns should run according to their setups.
when fail_fast enabled, teardowns should run according to their setups.
in this case, the second teardown skips because its setup skips.
"""

with dag_maker(fail_stop=True) as dag:
with dag_maker(fail_fast=True) as dag:
for num in (1, 2):
with TaskGroup(f"tg_{num}"):

Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,40 +1428,40 @@ def test_create_dagrun_job_id_is_set(self):

def test_dag_add_task_checks_trigger_rule(self):
# A non fail stop dag should allow any trigger rule
from airflow.exceptions import FailStopDagInvalidTriggerRule
from airflow.exceptions import FailFastDagInvalidTriggerRule
from airflow.utils.trigger_rule import TriggerRule

task_with_non_default_trigger_rule = EmptyOperator(
task_id="task_with_non_default_trigger_rule", trigger_rule=TriggerRule.ALWAYS
)
non_fail_stop_dag = DAG(
non_fail_fast_dag = DAG(
dag_id="test_dag_add_task_checks_trigger_rule",
schedule=None,
start_date=DEFAULT_DATE,
fail_stop=False,
fail_fast=False,
)
non_fail_stop_dag.add_task(task_with_non_default_trigger_rule)
non_fail_fast_dag.add_task(task_with_non_default_trigger_rule)

# a fail stop dag should allow default trigger rule
from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE

fail_stop_dag = DAG(
fail_fast_dag = DAG(
dag_id="test_dag_add_task_checks_trigger_rule",
schedule=None,
start_date=DEFAULT_DATE,
fail_stop=True,
fail_fast=True,
)
task_with_default_trigger_rule = EmptyOperator(
task_id="task_with_default_trigger_rule", trigger_rule=DEFAULT_TRIGGER_RULE
)
fail_stop_dag.add_task(task_with_default_trigger_rule)
fail_fast_dag.add_task(task_with_default_trigger_rule)

# a fail stop dag should not allow a non-default trigger rule
task_with_non_default_trigger_rule = EmptyOperator(
task_id="task_with_non_default_trigger_rule", trigger_rule=TriggerRule.ALWAYS
)
with pytest.raises(FailStopDagInvalidTriggerRule):
fail_stop_dag.add_task(task_with_non_default_trigger_rule)
with pytest.raises(FailFastDagInvalidTriggerRule):
fail_fast_dag.add_task(task_with_non_default_trigger_rule)

def test_dag_add_task_sets_default_task_group(self):
dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", schedule=None, start_date=DEFAULT_DATE)
Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1580,12 +1580,12 @@ def my_work(val):
}
assert states == expected

def test_one_to_many_with_teardown_and_fail_stop(self, dag_maker):
def test_one_to_many_with_teardown_and_fail_fast(self, dag_maker):
"""
With fail_stop enabled, the teardown for an already-completed setup
With fail_fast enabled, the teardown for an already-completed setup
should not be skipped.
"""
with dag_maker(fail_stop=True) as dag:
with dag_maker(fail_fast=True) as dag:

@task
def my_setup():
Expand Down Expand Up @@ -1616,12 +1616,12 @@ def my_teardown(val):
}
assert states == expected

def test_one_to_many_with_teardown_and_fail_stop_more_tasks(self, dag_maker):
def test_one_to_many_with_teardown_and_fail_fast_more_tasks(self, dag_maker):
"""
when fail_stop enabled, teardowns should run according to their setups.
when fail_fast enabled, teardowns should run according to their setups.
in this case, the second teardown skips because its setup skips.
"""
with dag_maker(fail_stop=True) as dag:
with dag_maker(fail_fast=True) as dag:
for num in (1, 2):
with TaskGroup(f"tg_{num}"):

Expand Down Expand Up @@ -1658,12 +1658,12 @@ def my_teardown(val):
}
assert states == expected

def test_one_to_many_with_teardown_and_fail_stop_more_tasks_mapped_setup(self, dag_maker):
def test_one_to_many_with_teardown_and_fail_fast_more_tasks_mapped_setup(self, dag_maker):
"""
when fail_stop enabled, teardowns should run according to their setups.
when fail_fast enabled, teardowns should run according to their setups.
in this case, the second teardown skips because its setup skips.
"""
with dag_maker(fail_stop=True) as dag:
with dag_maker(fail_fast=True) as dag:
for num in (1, 2):
with TaskGroup(f"tg_{num}"):

Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3649,19 +3649,19 @@ def test_handle_failure_task_undefined(self, create_task_instance):
ti.handle_failure("test ti.task undefined")

@provide_session
def test_handle_failure_fail_stop(self, create_dummy_dag, session=None):
def test_handle_failure_fail_fast(self, create_dummy_dag, session=None):
start_date = timezone.datetime(2016, 6, 1)
clear_db_runs()

dag, task1 = create_dummy_dag(
dag_id="test_handle_failure_fail_stop",
dag_id="test_handle_failure_fail_fast",
schedule=None,
start_date=start_date,
task_id="task1",
trigger_rule="all_success",
with_dagrun_type=DagRunType.MANUAL,
session=session,
fail_stop=True,
fail_fast=True,
)
logical_date = timezone.utcnow()
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
Expand Down