-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-72: Add support to get Variables in task SDK to author tasks #45458
base: main
Are you sure you want to change the base?
Conversation
This allows me to test something like:
Advantage is that now this can be used at task level as well at DAG parsing level. The PR is pre mature, will add edge cases etc once we are OK with the general direction. |
Some ideas:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to support accessing Variable
at the top level of dag files at parse time too (not sure if the impl needs to change, likely not, but we should test it)
Tested it out, and yes that won't be possible because of us depending on execution time. So implementation will have to change:
|
New update: Why don't we just use the SDK client instead? We don't really have a need to rely on supervisor here as variables can be retrieved at the top level too. We should also be able to add some level of control at the API level to return / reject API requests as forbidden. When we integrate the token mechanism, we can generate one long running token for such arbitrary requests. Testing:
|
21aa3e9
to
f87beb8
Compare
except ImportError: | ||
# If not, hypothesis is false and this request is from dag level. | ||
from airflow.dag_processing.processor import COMMS_DECODER as COMMS # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this one fails too, raise
it?
# GetVariable etc -- parsing a dag can run top level code that asks for an Airflow Variable | ||
super()._handle_request(msg, log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wont really need this, because for cases of variables, connecitons, we will have to interact with the DB model directly. If we go to super(). _handle_request
, it brings the SDK API client into picture, which shouldn't be needed for DAG level stuff
# GetVariable etc -- parsing a dag can run top level code that asks for an Airflow Variable | ||
super()._handle_request(msg, log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We wont really need this, because for cases of variables, connecitons, we will have to interact with the DB model directly. If we go to super(). _handle_request
, it brings the SDK API client into picture, which shouldn't be needed for DAG level stuff
Interesting that I cannot reproduce the failures locally. |
try: | ||
# We check the hypothesis if the request for variable came from task. | ||
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS as COMMS # type: ignore | ||
except ImportError: | ||
# If not, hypothesis is false and this request is from dag level. | ||
from airflow.dag_processing.processor import COMMS_DECODER as COMMS # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not too happy with this one. Wondering if we can do anything better here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge here is to know if the context is dag or task level. I don't seem to find a clear distinction to point out at and use
def _get_variable(key: str, deserialize_json: bool) -> Variable: | ||
# TODO: This should probably be moved to a separate module like `airflow.sdk.execution_time.comms` | ||
# or `airflow.sdk.execution_time.variable` | ||
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it | ||
# will make that module depend on Task SDK, which is not ideal because we intend to | ||
# keep Task SDK as a separate package than execution time mods. | ||
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable | ||
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS | ||
|
||
log = structlog.get_logger(logger_name="task") | ||
SUPERVISOR_COMMS.send_request(log=log, msg=GetVariable(key=key)) | ||
msg = SUPERVISOR_COMMS.get_message() | ||
if isinstance(msg, ErrorResponse): | ||
raise AirflowRuntimeError(msg) | ||
|
||
if TYPE_CHECKING: | ||
assert isinstance(msg, VariableResult) | ||
return _convert_variable_result_to_variable(msg, deserialize_json) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the right time to move these helpers to airflow.sdk.execution_time.variable
. We might be running into a circular import otherwise
closes: #45449
Intent
With AIP 72 coming in and for extending the task sdk to be able to write "complete" dags, we need to be able to interact with Airflow Variables: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html
Historically, this was done like this:
Either at the task level or at the DAG level. Note that "airflow.models" is used - which is what we are removing for Airflow 3 so that user code doesn't directly interact with DB models, preventing any potential hazard to the Airflow metadata DB. Instead, some user facing interfaces will be exposed to interact with Airflow entities so that we can provide a better DAG writing experience as well as be secure and reduce any risks.
The aim here is to be able to write dags with
from airflow.sdk import Variable
Key changes in the PR
definitions/variable.py
user facing interface, a "get" method has been introduced to fetch variables._get_variable(key)
which was introduced in AIP-72: Allow retrieving Variable from Task Context #45431_get_variable
, we perform a hypothesis check, we try to import "airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS", if import is possible, that means we are in execution context of a task, but if it fails, we are in execution context of a dag, so we attempt to import "airflow.dag_processing.processor import COMMS_DECODER" instead._handle_requests
to pass around the VariableResult to the dag processing process.Testing
Variable.get at dag level
DAG:
Variable:
When variable is present:
When variable isn't present (scheduler doesn't crash)
Variable.get at task level
DAG:
Variable present:
Variable not present:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.