Skip to content

Commit

Permalink
Re-attach logic - final fixes (#4064)
Browse files Browse the repository at this point in the history
  • Loading branch information
masipauskas authored Nov 26, 2024
1 parent 594c4aa commit c7a2a4e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
7 changes: 6 additions & 1 deletion docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, reattach_policy=None, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
Expand Down Expand Up @@ -60,6 +60,9 @@ and handles job cancellation if the Airflow task is killed.
* **dry_run** (*bool*) –


* **reattach_policy** (*Optional**[**str**] **| **Callable**[**[**JobState**, **str**]**, **bool**]*) –



#### execute(context)
Submits the job to Armada and polls for completion.
Expand Down Expand Up @@ -167,6 +170,8 @@ acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


Expand Down
36 changes: 23 additions & 13 deletions third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param reattach_policy: Operator reattach policy to use (defaults to: always)
:type reattach_policy: Optional[str]
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
"""

Expand Down Expand Up @@ -135,7 +135,7 @@ def __init__(
dry_run: bool = conf.getboolean(
"armada_operator", "default_dry_run", fallback=False
),
reattach_policy: Optional[str] = None,
reattach_policy: Optional[str] | Callable[[JobState, str], bool] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -154,14 +154,21 @@ def __init__(
self.dry_run = dry_run
self.job_context = None

configured_reattach_policy: str = resolve_parameter_value(
"reattach_policy", reattach_policy, kwargs, "never"
)
self.log.info(
f"Configured reattach policy to: '{configured_reattach_policy}',"
f" max retries: {self.retries}"
)
self.reattach_policy = policy(configured_reattach_policy)
if reattach_policy is callable(reattach_policy):
self.log.info(
f"Configured reattach policy with callable',"
f" max retries: {self.retries}"
)
self.reattach_policy = reattach_policy
else:
configured_reattach_policy: str = resolve_parameter_value(
"reattach_policy", reattach_policy, kwargs, "never"
)
self.log.info(
f"Configured reattach policy to: '{configured_reattach_policy}',"
f" max retries: {self.retries}"
)
self.reattach_policy = policy(configured_reattach_policy)

if self.container_logs and self.k8s_token_retriever is None:
self.log.warning(
Expand Down Expand Up @@ -342,8 +349,11 @@ def _try_reattach_to_running_job(
self, context: Context
) -> Optional[RunningJobContext]:
# On first try we intentionally do not re-attach.
self.log.info(context)
if context["ti"].try_number == 1:
new_run = (
context["ti"].max_tries - context["ti"].try_number + 1
== context["ti"].task.retries
)
if new_run:
return None

expected_job_uri = external_job_uri(context)
Expand Down
4 changes: 2 additions & 2 deletions third_party/airflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ build-backend = "setuptools.build_meta"

[project]
name = "armada_airflow"
version = "1.0.10"
version = "1.0.11"
description = "Armada Airflow Operator"
readme='README.md'
authors = [{name = "Armada-GROSS", email = "[email protected]"}]
license = { text = "Apache Software License" }
dependencies=[
'armada-client>=0.4.7',
'armada-client>=0.4.8',
'apache-airflow>=2.6.3',
'types-protobuf==4.24.0.1',
'kubernetes>=23.6.0',
Expand Down

0 comments on commit c7a2a4e

Please sign in to comment.