-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-72: Add support for get_current_context
in Task SDK
#45486
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kaxil
added
area:task-execution-interface-aip72
AIP-72: Task Execution Interface (TEI) aka Task SDK
and removed
area:providers
provider:standard
labels
Jan 8, 2025
kaxil
force-pushed
the
get-current-context
branch
2 times, most recently
from
January 8, 2025 20:10
06b7acd
to
66526e0
Compare
kaxil
force-pushed
the
get-current-context
branch
from
January 8, 2025 20:14
66526e0
to
9627d5a
Compare
kaxil
force-pushed
the
get-current-context
branch
2 times, most recently
from
January 9, 2025 06:04
9f96a41
to
407002b
Compare
uranusjr
reviewed
Jan 9, 2025
uranusjr
approved these changes
Jan 9, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
kaxil
force-pushed
the
get-current-context
branch
from
January 9, 2025 06:43
407002b
to
14de0a6
Compare
kaxil
added a commit
to astronomer/airflow
that referenced
this pull request
Jan 9, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
kaxil
added a commit
to astronomer/airflow
that referenced
this pull request
Jan 9, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
kaxil
added a commit
that referenced
this pull request
Jan 9, 2025
I had added `create_runtime_ti` in #45486, this PR modifies the fixture a little bit and uses it more broadly.
amoghrajesh
reviewed
Jan 9, 2025
Comment on lines
+101
to
+113
This fixture sets up a `RuntimeTaskInstance` with default or custom `TIRunContext` and `StartupDetails`, | ||
making it easy to simulate task execution scenarios in tests. | ||
|
||
Example usage: :: | ||
|
||
def test_custom_task_instance(create_runtime_ti): | ||
class MyTaskOperator(BaseOperator): | ||
def execute(self, context): | ||
assert context["dag_run"].run_id == "test_run" | ||
|
||
task = MyTaskOperator(task_id="test_task") | ||
ti = create_runtime_ti(task, context_from_server=make_ti_context(run_id="test_run")) | ||
# Further test logic... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing!
agupta01
pushed a commit
to agupta01/airflow
that referenced
this pull request
Jan 13, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
agupta01
pushed a commit
to agupta01/airflow
that referenced
this pull request
Jan 13, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
karenbraganz
pushed a commit
to karenbraganz/airflow
that referenced
this pull request
Jan 13, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
karenbraganz
pushed a commit
to karenbraganz/airflow
that referenced
this pull request
Jan 13, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
HariGS-DB
pushed a commit
to HariGS-DB/airflow
that referenced
this pull request
Jan 16, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
HariGS-DB
pushed a commit
to HariGS-DB/airflow
that referenced
this pull request
Jan 16, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
area:task-execution-interface-aip72
AIP-72: Task Execution Interface (TEI) aka Task SDK
area:task-sdk
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
closes #45234
I am putting the logic for
set_current_context
inexecution_time/context.py
. I didn't want to put_CURRENT_CONTEXT
intask_sdk/src/airflow/sdk/definitions/contextmanager.py
to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (viaget_current_context
in the Standard Provider) in their Task.Upcoming PRs:
create_runtime_ti
fixture more widely in testsTested with the following DAG:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.