Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to single key for bundle config #45318

Merged
merged 10 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2676,30 +2676,38 @@ dag_bundles:
description: |
Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources.

Airflow will consume all options added to this section. Below you will see only the default,
``dags_folder``. The option name is the bundle name and the value is a json object with the following
keys:

* classpath: The classpath of the bundle class
* kwargs: The keyword arguments to pass to the bundle class
* refresh_interval: The interval in seconds to refresh the bundle from its source.
options:
backends:
description: |
List of backend configs. Must supply name, classpath, and kwargs for each backend.

For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your
airflow.cfg (this is just an example, the classpath and kwargs are not real):
By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can
also be overridden in kwargs if desired.

.. code-block:: ini
The default is the dags folder dag bundle.

[dag_bundles]
hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60}
options:
dags_folder:
description: |
This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``.
By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be
overridden here if desired.
Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string.
version_added: ~
Note: As shown below, you can split your json config over multiple lines by indenting.
See configparser documentation for an example:
https://docs.python.org/3/library/configparser.html#supported-ini-file-structure.
version_added: 3.0.0
type: string
example: ~
default: '{{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle",
"kwargs": {{}}}}'
example: >
[
{
"name": "my-git-repo",
"classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
"kwargs": {
"subdir": "dags",
"repo_url": "[email protected]:example.com/my-dags.git",
"tracking_ref": "main",
"refresh_interval": 0
}
]
default: >
[
{{
"name": "dags-folder",
dstandish marked this conversation as resolved.
Show resolved Hide resolved
"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle",
"kwargs": {{}}
}}
]
4 changes: 4 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ def _write_option_header(
if example is not None and include_examples:
if extra_spacing:
file.write("#\n")
example_lines = example.splitlines()
example = "\n# ".join(example_lines)
file.write(f"# Example: {option} = {example}\n")
needs_separation = True
if include_sources and sources_dict:
Expand Down Expand Up @@ -553,6 +555,8 @@ def _write_value(
file.write(f"# {option} = \n")
else:
if comment_out_everything:
value_lines = value.splitlines()
value = "\n# ".join(value_lines)
file.write(f"# {option} = {value}\n")
else:
file.write(f"{option} = {value}\n")
Expand Down
93 changes: 55 additions & 38 deletions airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from collections.abc import Iterable

from sqlalchemy.orm import Session

from airflow.dag_processing.bundles.base import BaseDagBundle
Expand All @@ -34,52 +36,57 @@
class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""

@property
def bundle_configs(self) -> dict[str, dict]:
"""Get all DAG bundle configurations."""
configured_bundles = conf.getsection("dag_bundles")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._bundle_config = {}
self.parse_config()

if not configured_bundles:
return {}
def parse_config(self) -> None:
"""
Get all DAG bundle configurations and store in instance variable.

# If dags_folder is empty string, we remove it. This allows the default dags_folder bundle to be disabled.
if not configured_bundles["dags_folder"]:
del configured_bundles["dags_folder"]
If a bundle class for a given name has already been imported, it will not be imported again.

dict_bundles: dict[str, dict] = {}
for key in configured_bundles.keys():
config = conf.getjson("dag_bundles", key)
if not isinstance(config, dict):
raise AirflowConfigException(f"Bundle config for {key} is not a dict: {config}")
dict_bundles[key] = config
todo (AIP-66): proper validation of the bundle configuration so we have better error messages

return dict_bundles
:meta private:
"""
if self._bundle_config:
return

backends = conf.getjson("dag_bundles", "backends")

if not backends:
return

if not isinstance(backends, list):
raise AirflowConfigException(
"Bundle config is not a list. Check config value"
" for section `dag_bundles` and key `backends`."
)
seen = set()
for cfg in backends:
name = cfg["name"]
if name in seen:
raise ValueError(f"Dag bundle {name} is configured twice.")
seen.add(name)
class_ = import_string(cfg["classpath"])
kwargs = cfg["kwargs"]
self._bundle_config[name] = (class_, kwargs)

@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
known_bundles = {b.name: b for b in session.query(DagBundleModel).all()}

for name in self.bundle_configs.keys():
if bundle := known_bundles.get(name):
stored = {b.name: b for b in session.query(DagBundleModel).all()}
for name in self._bundle_config.keys():
if bundle := stored.pop(name, None):
bundle.active = True
else:
session.add(DagBundleModel(name=name))
self.log.info("Added new DAG bundle %s to the database", name)

for name, bundle in known_bundles.items():
if name not in self.bundle_configs:
bundle.active = False
self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name)

def get_all_dag_bundles(self) -> list[BaseDagBundle]:
"""
Get all DAG bundles.

:param session: A database session.

:return: list of DAG bundles.
"""
return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()]
for name, bundle in stored.items():
bundle.active = False
self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name)

def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
"""
Expand All @@ -90,7 +97,17 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:

:return: The DAG bundle.
"""
# TODO: proper validation of the bundle configuration so we have better error messages
bundle_config = self.bundle_configs[name]
bundle_class = import_string(bundle_config["classpath"])
return bundle_class(name=name, version=version, **bundle_config["kwargs"])
cfg_tuple = self._bundle_config.get(name)
if not cfg_tuple:
raise ValueError(f"Requested bundle '{name}' is not configured.")
class_, kwargs = cfg_tuple
return class_(name=name, version=version, **kwargs)

def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
"""
Get all DAG bundles.

:return: list of DAG bundles.
"""
for name, (class_, kwargs) in self._bundle_config.items():
yield class_(name=name, version=None, **kwargs)
21 changes: 18 additions & 3 deletions docs/exts/includes/sections-and-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@
{% endif %}

:Type: {{ option["type"] }}
:Default: ``{{ "''" if option["default"] == "" else option["default"] }}``
:Default:
{% set default = option["default"] %}
{% if default and "\n" in default %}
.. code-block::

{{ default }}
{% else %}
``{{ "''" if default == "" else default }}``
{% endif %}
{% if option.get("sensitive") %}
:Environment Variables:
``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}``
Expand All @@ -71,9 +79,16 @@
{% else %}
:Environment Variable: ``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}``
{% endif %}
{% if option["example"] %}
{% set example = option["example"] %}
{% if example %}
:Example:
``{{ option["example"] }}``
{% if "\n" in example %}
.. code-block::

{{ example }}
{% else %}
``{{ example }}``
{% endif %}
{% endif %}
{% endfor %}

Expand Down
3 changes: 2 additions & 1 deletion tests/cli/commands/remote_commands/test_config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def test_cli_comment_out_everything(self):
)
output = temp_stdout.getvalue()
lines = output.splitlines()
assert all(not line.strip() or line.startswith(("#", "[")) for line in lines if line)
bad_lines = [l for l in lines if l and not (not l.strip() or l.startswith(("#", "[")))] # noqa: E741
assert bad_lines == []


class TestCliConfigGetValue:
Expand Down
Loading
Loading