Skip to content

Commit

Permalink
Switch to single key for bundle config (#45318)
Browse files Browse the repository at this point in the history
Switching bundle configuration to use a key for the bundle backends config.
  • Loading branch information
dstandish authored Jan 6, 2025
1 parent 5581e65 commit 1208679
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 125 deletions.
54 changes: 31 additions & 23 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2676,30 +2676,38 @@ dag_bundles:
description: |
Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources.
Airflow will consume all options added to this section. Below you will see only the default,
``dags_folder``. The option name is the bundle name and the value is a json object with the following
keys:
* classpath: The classpath of the bundle class
* kwargs: The keyword arguments to pass to the bundle class
* refresh_interval: The interval in seconds to refresh the bundle from its source.
options:
backends:
description: |
List of backend configs. Must supply name, classpath, and kwargs for each backend.
For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your
airflow.cfg (this is just an example, the classpath and kwargs are not real):
By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can
also be overridden in kwargs if desired.
.. code-block:: ini
The default is the dags folder dag bundle.
[dag_bundles]
hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60}
options:
dags_folder:
description: |
This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``.
By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be
overridden here if desired.
Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string.
version_added: ~
Note: As shown below, you can split your json config over multiple lines by indenting.
See configparser documentation for an example:
https://docs.python.org/3/library/configparser.html#supported-ini-file-structure.
version_added: 3.0.0
type: string
example: ~
default: '{{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle",
"kwargs": {{}}}}'
example: >
[
{
"name": "my-git-repo",
"classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
"kwargs": {
"subdir": "dags",
"repo_url": "[email protected]:example.com/my-dags.git",
"tracking_ref": "main",
"refresh_interval": 0
}
]
default: >
[
{{
"name": "dags-folder",
"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle",
"kwargs": {{}}
}}
]
4 changes: 4 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ def _write_option_header(
if example is not None and include_examples:
if extra_spacing:
file.write("#\n")
example_lines = example.splitlines()
example = "\n# ".join(example_lines)
file.write(f"# Example: {option} = {example}\n")
needs_separation = True
if include_sources and sources_dict:
Expand Down Expand Up @@ -553,6 +555,8 @@ def _write_value(
file.write(f"# {option} = \n")
else:
if comment_out_everything:
value_lines = value.splitlines()
value = "\n# ".join(value_lines)
file.write(f"# {option} = {value}\n")
else:
file.write(f"{option} = {value}\n")
Expand Down
93 changes: 55 additions & 38 deletions airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from collections.abc import Iterable

from sqlalchemy.orm import Session

from airflow.dag_processing.bundles.base import BaseDagBundle
Expand All @@ -34,52 +36,57 @@
class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""

@property
def bundle_configs(self) -> dict[str, dict]:
"""Get all DAG bundle configurations."""
configured_bundles = conf.getsection("dag_bundles")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._bundle_config = {}
self.parse_config()

if not configured_bundles:
return {}
def parse_config(self) -> None:
"""
Get all DAG bundle configurations and store in instance variable.
# If dags_folder is empty string, we remove it. This allows the default dags_folder bundle to be disabled.
if not configured_bundles["dags_folder"]:
del configured_bundles["dags_folder"]
If a bundle class for a given name has already been imported, it will not be imported again.
dict_bundles: dict[str, dict] = {}
for key in configured_bundles.keys():
config = conf.getjson("dag_bundles", key)
if not isinstance(config, dict):
raise AirflowConfigException(f"Bundle config for {key} is not a dict: {config}")
dict_bundles[key] = config
todo (AIP-66): proper validation of the bundle configuration so we have better error messages
return dict_bundles
:meta private:
"""
if self._bundle_config:
return

backends = conf.getjson("dag_bundles", "backends")

if not backends:
return

if not isinstance(backends, list):
raise AirflowConfigException(
"Bundle config is not a list. Check config value"
" for section `dag_bundles` and key `backends`."
)
seen = set()
for cfg in backends:
name = cfg["name"]
if name in seen:
raise ValueError(f"Dag bundle {name} is configured twice.")
seen.add(name)
class_ = import_string(cfg["classpath"])
kwargs = cfg["kwargs"]
self._bundle_config[name] = (class_, kwargs)

@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
known_bundles = {b.name: b for b in session.query(DagBundleModel).all()}

for name in self.bundle_configs.keys():
if bundle := known_bundles.get(name):
stored = {b.name: b for b in session.query(DagBundleModel).all()}
for name in self._bundle_config.keys():
if bundle := stored.pop(name, None):
bundle.active = True
else:
session.add(DagBundleModel(name=name))
self.log.info("Added new DAG bundle %s to the database", name)

for name, bundle in known_bundles.items():
if name not in self.bundle_configs:
bundle.active = False
self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name)

def get_all_dag_bundles(self) -> list[BaseDagBundle]:
"""
Get all DAG bundles.
:param session: A database session.
:return: list of DAG bundles.
"""
return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()]
for name, bundle in stored.items():
bundle.active = False
self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name)

def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
"""
Expand All @@ -90,7 +97,17 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
:return: The DAG bundle.
"""
# TODO: proper validation of the bundle configuration so we have better error messages
bundle_config = self.bundle_configs[name]
bundle_class = import_string(bundle_config["classpath"])
return bundle_class(name=name, version=version, **bundle_config["kwargs"])
cfg_tuple = self._bundle_config.get(name)
if not cfg_tuple:
raise ValueError(f"Requested bundle '{name}' is not configured.")
class_, kwargs = cfg_tuple
return class_(name=name, version=version, **kwargs)

def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
"""
Get all DAG bundles.
:return: list of DAG bundles.
"""
for name, (class_, kwargs) in self._bundle_config.items():
yield class_(name=name, version=None, **kwargs)
21 changes: 18 additions & 3 deletions docs/exts/includes/sections-and-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@
{% endif %}

:Type: {{ option["type"] }}
:Default: ``{{ "''" if option["default"] == "" else option["default"] }}``
:Default:
{% set default = option["default"] %}
{% if default and "\n" in default %}
.. code-block::
{{ default }}
{% else %}
``{{ "''" if default == "" else default }}``
{% endif %}
{% if option.get("sensitive") %}
:Environment Variables:
``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}``
Expand All @@ -71,9 +79,16 @@
{% else %}
:Environment Variable: ``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}``
{% endif %}
{% if option["example"] %}
{% set example = option["example"] %}
{% if example %}
:Example:
``{{ option["example"] }}``
{% if "\n" in example %}
.. code-block::
{{ example }}
{% else %}
``{{ example }}``
{% endif %}
{% endif %}
{% endfor %}

Expand Down
3 changes: 2 additions & 1 deletion tests/cli/commands/remote_commands/test_config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def test_cli_comment_out_everything(self):
)
output = temp_stdout.getvalue()
lines = output.splitlines()
assert all(not line.strip() or line.startswith(("#", "[")) for line in lines if line)
bad_lines = [l for l in lines if l and not (not l.strip() or l.startswith(("#", "[")))] # noqa: E741
assert bad_lines == []


class TestCliConfigGetValue:
Expand Down
Loading

0 comments on commit 1208679

Please sign in to comment.