Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace external_trigger check with DagRunType #45961

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def trigger_dag(
"data_interval_start": dag_run.data_interval_start,
"data_interval_end": dag_run.data_interval_end,
"end_date": dag_run.end_date,
"external_trigger": dag_run.external_trigger,
"last_scheduling_decision": dag_run.last_scheduling_decision,
"logical_date": dag_run.logical_date,
"run_type": dag_run.run_type,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def find_task_relatives(tasks, downstream, upstream):
@provide_session
def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASession = NEW_SESSION):
"""Return DAG executions' run_ids."""
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
last_dagrun = dag.get_last_dagrun(include_manually_triggered=True, session=session)
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
first_dagrun = session.scalar(
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1)
Expand Down
1 change: 0 additions & 1 deletion airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def _trigger_dag(
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
external_trigger=True,
dag_version=dag_version,
state=DagRunState.QUEUED,
session=session,
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ def _fetch_dag_runs(
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
]
query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs)
Expand Down Expand Up @@ -354,7 +353,6 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class Meta:
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
data_interval_start = auto_field(validate=validate_istimezone)
data_interval_end = auto_field(validate=validate_istimezone)
Expand Down
1 change: 0 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class DAGRunResponse(BaseModel):
last_scheduling_decision: datetime | None
run_type: DagRunType
state: DagRunState
external_trigger: bool
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None
Expand Down
4 changes: 0 additions & 4 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7656,9 +7656,6 @@ components:
$ref: '#/components/schemas/DagRunType'
state:
$ref: '#/components/schemas/DagRunState'
external_trigger:
type: boolean
title: External Trigger
triggered_by:
$ref: '#/components/schemas/DagRunTriggeredByType'
conf:
Expand All @@ -7682,7 +7679,6 @@ components:
- last_scheduling_decision
- run_type
- state
- external_trigger
- triggered_by
- conf
- note
Expand Down
3 changes: 0 additions & 3 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def get_dag_runs(
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
],
DagRun,
Expand Down Expand Up @@ -375,7 +374,6 @@ def trigger_dag_run(
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
Expand Down Expand Up @@ -424,7 +422,6 @@ def get_list_dag_runs_batch(
"start_date",
"end_date",
"updated_at",
"external_trigger",
"conf",
],
DagRun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ class DagRun(BaseModel):
end_date: UtcDateTime | None
run_type: DagRunType
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
external_trigger: bool = False


class TIRunContext(BaseModel):
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ def _get_dag_run(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
external_trigger=True,
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
triggered_by=DagRunTriggeredByType.CLI,
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
run_type = dag_run.run_type

print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
print(f"Failed with message: {msg}")


Expand Down
1 change: 0 additions & 1 deletion airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def serialize(self):
state=self.dag_run.state,
dag_id=self.dag_run.dag_id,
run_id=self.dag_run.run_id,
external_trigger=self.dag_run.external_trigger,
run_type=self.dag_run.run_type,
)
dag_run.id = self.dag_run.id
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,14 +1476,14 @@ def _update_state(dag: DAG, dag_run: DagRun):

dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1:
if dag.timetable.periodic and dag_run.run_type != DagRunType.MANUAL and dag_run.clear_number < 1:
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
# a field on DagRun for this instead of relying on the run
# always happening immediately after the data interval.
# We only publish these metrics for scheduled dag runs and only
# when ``external_trigger`` is *False* and ``clear_number`` is 0.
# when ``run_type`` is *MANUAL* and ``clear_number`` is 0.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
# Publish metrics twice with backward compatible name, and then with tags
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
remove external_trigger field.

Revision ID: fbe8516980a3
Revises: e39a26ac59f6
Create Date: 2025-01-23 09:53:31.283015

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "fbe8516980a3"
down_revision = "e39a26ac59f6"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply remove external_trigger field."""
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_column("external_trigger")


def downgrade():
"""Unapply remove external_trigger field."""
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("external_trigger", sa.BOOLEAN(), autoincrement=False, nullable=True))
22 changes: 9 additions & 13 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _get_model_data_interval(
return DataInterval(start, end)


def get_last_dagrun(dag_id, session, include_externally_triggered=False):
def get_last_dagrun(dag_id, session, include_manually_triggered=False):
"""
Return the last dag run for a dag, None if there was none.

Expand All @@ -190,8 +190,8 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
"""
DR = DagRun
query = select(DR).where(DR.dag_id == dag_id)
if not include_externally_triggered:
query = query.where(DR.external_trigger == expression.false())
if not include_manually_triggered:
query = query.where(DR.run_type != DagRunType.MANUAL)
query = query.order_by(DR.logical_date.desc())
return session.scalar(query.limit(1))

Expand Down Expand Up @@ -252,7 +252,6 @@ def _create_orm_dagrun(
logical_date: datetime | None,
data_interval: DataInterval | None,
start_date: datetime | None,
external_trigger: bool,
conf: Any,
state: DagRunState | None,
run_type: DagRunType,
Expand All @@ -267,7 +266,6 @@ def _create_orm_dagrun(
run_id=run_id,
logical_date=logical_date,
start_date=start_date,
external_trigger=external_trigger,
conf=conf,
state=state,
run_type=run_type,
Expand Down Expand Up @@ -709,16 +707,16 @@ def iter_dagrun_infos_between(
break

@provide_session
def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
def get_last_dagrun(self, session=NEW_SESSION, include_manually_triggered=False):
return get_last_dagrun(
self.dag_id, session=session, include_externally_triggered=include_externally_triggered
self.dag_id, session=session, include_manually_triggered=include_manually_triggered
)

@provide_session
def has_dag_runs(self, session=NEW_SESSION, include_externally_triggered=True) -> bool:
def has_dag_runs(self, session=NEW_SESSION, include_manually_triggered=True) -> bool:
return (
get_last_dagrun(
self.dag_id, session=session, include_externally_triggered=include_externally_triggered
self.dag_id, session=session, include_manually_triggered=include_manually_triggered
)
is not None
)
Expand Down Expand Up @@ -1738,7 +1736,6 @@ def create_dagrun(
conf: dict | None = None,
run_type: DagRunType,
triggered_by: DagRunTriggeredByType,
external_trigger: bool = False,
dag_version: DagVersion | None = None,
state: DagRunState,
start_date: datetime | None = None,
Expand Down Expand Up @@ -1806,7 +1803,6 @@ def create_dagrun(
run_id=run_id,
logical_date=logical_date,
start_date=timezone.coerce_datetime(start_date),
external_trigger=external_trigger,
conf=conf,
state=state,
run_type=run_type,
Expand Down Expand Up @@ -2173,9 +2169,9 @@ def get_current(cls, dag_id: str, session=NEW_SESSION) -> DagModel:
return session.scalar(select(cls).where(cls.dag_id == dag_id))

@provide_session
def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
def get_last_dagrun(self, session=NEW_SESSION, include_manually_triggered=False):
return get_last_dagrun(
self.dag_id, session=session, include_externally_triggered=include_externally_triggered
self.dag_id, session=session, include_manually_triggered=include_manually_triggered
)

def get_is_paused(self, *, session: Session | None = None) -> bool:
Expand Down
18 changes: 4 additions & 14 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import re2
from sqlalchemy import (
JSON,
Boolean,
Column,
Enum,
ForeignKey,
Expand Down Expand Up @@ -136,7 +135,6 @@ class DagRun(Base, LoggingMixin):
_state = Column("state", String(50), default=DagRunState.QUEUED)
run_id = Column(StringID(), nullable=False)
creating_job_id = Column(Integer)
external_trigger = Column(Boolean, default=True)
run_type = Column(String(50), nullable=False)
triggered_by = Column(
Enum(DagRunTriggeredByType, native_enum=False, length=50)
Expand Down Expand Up @@ -232,7 +230,6 @@ def __init__(
queued_at: datetime | None | ArgNotSet = NOTSET,
logical_date: datetime | None = None,
start_date: datetime | None = None,
external_trigger: bool | None = None,
conf: Any | None = None,
state: DagRunState | None = None,
run_type: str | None = None,
Expand All @@ -254,7 +251,6 @@ def __init__(
self.run_id = run_id
self.logical_date = logical_date
self.start_date = start_date
self.external_trigger = external_trigger
self.conf = conf or {}
if state is not None:
self.state = state
Expand All @@ -273,7 +269,7 @@ def __init__(
def __repr__(self):
return (
f"<DagRun {self.dag_id} @ {self.logical_date}: {self.run_id}, state:{self.state}, "
f"queued_at: {self.queued_at}. externally triggered: {self.external_trigger}>"
f"queued_at: {self.queued_at}. run_type: {self.run_type}>"
)

@validates("run_id")
Expand Down Expand Up @@ -543,7 +539,6 @@ def find(
run_id: Iterable[str] | None = None,
logical_date: datetime | Iterable[datetime] | None = None,
state: DagRunState | None = None,
external_trigger: bool | None = None,
no_backfills: bool = False,
run_type: DagRunType | None = None,
session: Session = NEW_SESSION,
Expand All @@ -558,7 +553,6 @@ def find(
:param run_type: type of DagRun
:param logical_date: the logical date
:param state: the state of the dag run
:param external_trigger: whether this dag run is externally triggered
:param no_backfills: return no backfills (True), return all (False).
Defaults to False
:param session: database session
Expand Down Expand Up @@ -586,8 +580,6 @@ def find(
qry = qry.where(cls.logical_date <= logical_end_date)
if state:
qry = qry.where(cls.state == state)
if external_trigger is not None:
qry = qry.where(cls.external_trigger == external_trigger)
if run_type:
qry = qry.where(cls.run_type == run_type)
if no_backfills:
Expand Down Expand Up @@ -970,7 +962,7 @@ def recalculate(self) -> _UnfinishedStates:
msg = (
"DagRun Finished: dag_id=%s, logical_date=%s, run_id=%s, "
"run_start_date=%s, run_end_date=%s, run_duration=%s, "
"state=%s, external_trigger=%s, run_type=%s, "
"state=%s, run_type=%s, "
"data_interval_start=%s, data_interval_end=%s, dag_version_name=%s"
)
dagv = session.scalar(select(DagVersion).where(DagVersion.id == self.dag_version_id))
Expand All @@ -987,7 +979,6 @@ def recalculate(self) -> _UnfinishedStates:
else None
),
self._state,
self.external_trigger,
self.run_type,
self.data_interval_start,
self.data_interval_end,
Expand Down Expand Up @@ -1022,7 +1013,6 @@ def _trace_dagrun(self, dagv) -> None:
if self.start_date and self.end_date
else 0,
"state": str(self._state),
"external_trigger": self.external_trigger,
"run_type": str(self.run_type),
"data_interval_start": str(self.data_interval_start),
"data_interval_end": str(self.data_interval_end),
Expand Down Expand Up @@ -1227,11 +1217,11 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
rid of the outliers on the stats side through dashboards tooling.

Note that the stat will only be emitted for scheduler-triggered DAG runs
(i.e. when ``external_trigger`` is *False* and ``clear_number`` is equal to 0).
(i.e. when ``run_type`` is not *MANUAL* and ``clear_number`` is equal to 0).
"""
if self.state == TaskInstanceState.RUNNING:
return
if self.external_trigger:
if self.run_type == DagRunType.MANUAL:
return
if self.clear_number > 0:
return
Expand Down
Loading
Loading