From ab67c84d835aea538783d3e684d6b70e4e647b10 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 31 Dec 2024 15:08:41 -0800 Subject: [PATCH 01/10] Switch to single key for bundle config --- airflow/config_templates/config.yml | 26 +++-- airflow/dag_processing/bundles/manager.py | 95 +++++++++++-------- .../bundles/test_dag_bundle_manager.py | 73 +++++++++----- 3 files changed, 121 insertions(+), 73 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1a2d664955ddd..ba87c60a72df1 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2692,14 +2692,26 @@ dag_bundles: [dag_bundles] hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60} options: - dags_folder: + backends: description: | - This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``. + List of backend configs. Must supply name, classpath, and kwargs for each backend. + By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be overridden here if desired. + Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. - version_added: ~ - type: string - example: ~ - default: '{{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", - "kwargs": {{}}}}' + version_added: 3.0 + type: object + example: > + [ + { + "name": "my-git-repo", + "classpath": "airflow.dag_processing.bundles.git.GitDagBundle", + "kwargs": { + "subdir": "dags", + "repo_url": "git@github.com:example.com/my-dags.git", + "tracking_ref": "main", + "refresh_interval": 0 + } + ]' + default: '[{{"name": "dags-folder", "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", "kwargs": {{}}}}]' diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 4f8b59b956e18..85c402d9909ff 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -16,10 +16,10 @@ # under the License. from __future__ import annotations +from collections.abc import Iterable from typing import TYPE_CHECKING from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException from airflow.models.dagbundle import DagBundleModel from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string @@ -30,56 +30,54 @@ from airflow.dag_processing.bundles.base import BaseDagBundle +_bundle_config = {} + class DagBundlesManager(LoggingMixin): """Manager for DAG bundles.""" - @property - def bundle_configs(self) -> dict[str, dict]: - """Get all DAG bundle configurations.""" - configured_bundles = conf.getsection("dag_bundles") - - if not configured_bundles: - return {} - - # If dags_folder is empty string, we remove it. This allows the default dags_folder bundle to be disabled. - if not configured_bundles["dags_folder"]: - del configured_bundles["dags_folder"] + def parse_config(self) -> None: + """ + Get all DAG bundle configurations and store in module variable. - dict_bundles: dict[str, dict] = {} - for key in configured_bundles.keys(): - config = conf.getjson("dag_bundles", key) - if not isinstance(config, dict): - raise AirflowConfigException(f"Bundle config for {key} is not a dict: {config}") - dict_bundles[key] = config + If a bundle class for a given name has already been imported, it will not be imported again. + """ + configured_bundles = conf.getjson("dag_bundles", "backends") - return dict_bundles + if not configured_bundles: + return + + seen = set() + for cfg in configured_bundles: + name = cfg["name"] + if name in seen: + raise ValueError(f"Dag bundle {name} is configured twice.") + seen.add(name) + if name in _bundle_config: + continue + class_ = import_string(cfg["classpath"]) + kwargs = cfg["kwargs"] + _bundle_config[name] = (class_, kwargs) + + # remove obsolete bundle configs + for name, cfg in _bundle_config.items(): + if name not in seen: + _bundle_config.pop(name) @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: - known_bundles = {b.name: b for b in session.query(DagBundleModel).all()} - - for name in self.bundle_configs.keys(): - if bundle := known_bundles.get(name): + self.parse_config() + stored = {b.name: b for b in session.query(DagBundleModel).all()} + for name in _bundle_config.keys(): + if bundle := stored.pop(name, None): bundle.active = True else: session.add(DagBundleModel(name=name)) self.log.info("Added new DAG bundle %s to the database", name) - for name, bundle in known_bundles.items(): - if name not in self.bundle_configs: - bundle.active = False - self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) - - def get_all_dag_bundles(self) -> list[BaseDagBundle]: - """ - Get all DAG bundles. - - :param session: A database session. - - :return: list of DAG bundles. - """ - return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()] + for name, bundle in stored.items(): + bundle.active = False + self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: """ @@ -90,7 +88,22 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: :return: The DAG bundle. """ - # TODO: proper validation of the bundle configuration so we have better error messages - bundle_config = self.bundle_configs[name] - bundle_class = import_string(bundle_config["classpath"]) - return bundle_class(name=name, version=version, **bundle_config["kwargs"]) + # todo (AIP-66): proper validation of the bundle configuration so we have better error messages + cfg_tuple = _bundle_config.get(name) + if not cfg_tuple: + self.parse_config() + cfg_tuple = _bundle_config.get(name) + if not cfg_tuple: + raise ValueError(f"Requested bundle '{name}' is not configured.") + class_, kwargs = cfg_tuple + return class_(name=name, version=version, **kwargs) + + def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]: + """ + Get all DAG bundles. + + :return: list of DAG bundles. + """ + self.parse_config() + for name, (class_, kwargs) in _bundle_config.items(): + yield class_(name=name, version=None, **kwargs) diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index c79acfc2ed2a5..0c4de43676acc 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -37,12 +37,12 @@ [ pytest.param({}, {"dags_folder"}, id="no_config"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, {"testbundle", "dags_folder"}, id="add_bundle" + {"AIRFLOW__DAG_BUNDLES__BACKENDS": "{}"}, {"my-test-bundle", "dags_folder"}, id="add_bundle" ), pytest.param({"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, set(), id="remove_dags_folder_default"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", "AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, - {"testbundle"}, + {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", "AIRFLOW__DAG_BUNDLES__BACKENDS": "{}"}, + {"my-test-bundle"}, id="remove_dags_folder_default_add_bundle", ), ], @@ -58,14 +58,14 @@ def test_bundle_configs_property(envs, expected_names): @pytest.mark.parametrize( "config,message", [ - pytest.param("1", "Bundle config for testbundle is not a dict: 1", id="int"), - pytest.param("[]", r"Bundle config for testbundle is not a dict: \[\]", id="list"), + pytest.param("1", "Bundle config for my-test-bundle is not a dict: 1", id="int"), + pytest.param("[]", r"Bundle config for my-test-bundle is not a dict: \[\]", id="list"), pytest.param("abc", r"Unable to parse .* as valid json", id="not_json"), ], ) def test_bundle_configs_property_raises(config, message): bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": config}): + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": config}): with pytest.raises(AirflowConfigException, match=message): bundle_manager.bundle_configs @@ -81,10 +81,13 @@ def path(self): pass -BASIC_BUNDLE_CONFIG = { - "classpath": "tests.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", - "kwargs": {"refresh_interval": 1}, -} +BASIC_BUNDLE_CONFIG = [ + { + "name": "my-test-bundle", + "classpath": "tests.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", + "kwargs": {"refresh_interval": 1}, + } +] def test_get_bundle(): @@ -92,18 +95,20 @@ def test_get_bundle(): bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle = bundle_manager.get_bundle(name="testbundle", version="hello") + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + with pytest.raises(ValueError, match="'bundle-that-doesn't-exist' is not configured"): + bundle = bundle_manager.get_bundle(name="bundle-that-doesn't-exist", version="hello") + bundle = bundle_manager.get_bundle(name="my-test-bundle", version="hello") assert isinstance(bundle, BasicBundle) - assert bundle.name == "testbundle" + assert bundle.name == "my-test-bundle" assert bundle.version == "hello" assert bundle.refresh_interval == 1 # And none for version also works! - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle = bundle_manager.get_bundle(name="testbundle") + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle = bundle_manager.get_bundle(name="my-test-bundle") assert isinstance(bundle, BasicBundle) - assert bundle.name == "testbundle" + assert bundle.name == "my-test-bundle" assert bundle.version is None @@ -112,13 +117,25 @@ def test_get_all_dag_bundles(): bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundles = bundle_manager.get_all_dag_bundles() - assert len(bundles) == 2 + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundles = list(bundle_manager.get_all_dag_bundles()) + assert len(bundles) == 1 assert all(isinstance(x, BaseDagBundle) for x in bundles) bundle_names = {x.name for x in bundles} - assert bundle_names == {"testbundle", "dags_folder"} + assert bundle_names == {"my-test-bundle"} + + +def test_get_all_dag_bundles_default(): + """Test that get_all_dag_bundles returns all bundles.""" + + bundle_manager = DagBundlesManager() + bundles = list(bundle_manager.get_all_dag_bundles()) + assert len(bundles) == 1 + assert all(isinstance(x, BaseDagBundle) for x in bundles) + + bundle_names = {x.name for x in bundles} + assert bundle_names == {"dags-folder"} @pytest.fixture @@ -139,15 +156,21 @@ def _get_bundle_names_and_active(): ) # Initial add - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] + assert _get_bundle_names_and_active() == [("dags_folder", True), ("my-test-bundle", True)] # Disable ones that disappear from config bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", False)] + assert _get_bundle_names_and_active() == [("dags_folder", True), ("my-test-bundle", False)] # Re-enable one that reappears in config - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] + assert _get_bundle_names_and_active() == [("dags_folder", True), ("my-test-bundle", True)] + + +# import yaml +# from pathlib import Path +# d = yaml.safe_load(Path("/Users/dstandish/code/airflow/airflow/config_templates/config.yml").open()) +# d["dag_bundles"] From d93efb9b42aebcd0a2972a3afceb1094449ec61f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 31 Dec 2024 15:34:32 -0800 Subject: [PATCH 02/10] fix tests --- airflow/dag_processing/bundles/manager.py | 8 +++- .../bundles/test_dag_bundle_manager.py | 42 ++++++++++++------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 85c402d9909ff..9e97c7c8449f7 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException from airflow.models.dagbundle import DagBundleModel from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string @@ -47,6 +48,11 @@ def parse_config(self) -> None: if not configured_bundles: return + if not isinstance(configured_bundles, list): + raise AirflowConfigException( + "Bundle config is not a list. Check config value" + " for section `dag_bundles` and key `backends`." + ) seen = set() for cfg in configured_bundles: name = cfg["name"] @@ -60,7 +66,7 @@ def parse_config(self) -> None: _bundle_config[name] = (class_, kwargs) # remove obsolete bundle configs - for name, cfg in _bundle_config.items(): + for name in list(_bundle_config.keys()): if name not in seen: _bundle_config.pop(name) diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index 0c4de43676acc..d0ae79b6df4df 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -35,14 +35,23 @@ @pytest.mark.parametrize( "envs,expected_names", [ - pytest.param({}, {"dags_folder"}, id="no_config"), + pytest.param({}, {"dags-folder"}, id="default"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__BACKENDS": "{}"}, {"my-test-bundle", "dags_folder"}, id="add_bundle" + {"AIRFLOW__DAG_BUNDLES__BACKENDS": "[]"}, + {"dags-folder"}, + id="empty-list", ), - pytest.param({"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, set(), id="remove_dags_folder_default"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", "AIRFLOW__DAG_BUNDLES__BACKENDS": "{}"}, - {"my-test-bundle"}, + {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, + {"dags-folder"}, + id="empty-string", + ), + pytest.param( + { + "AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", + "AIRFLOW__DAG_BUNDLES__BACKENDS": "{}", + }, + {"dags-folder"}, id="remove_dags_folder_default_add_bundle", ), ], @@ -51,23 +60,28 @@ def test_bundle_configs_property(envs, expected_names): """Test that bundle_configs are read from configuration.""" bundle_manager = DagBundlesManager() with patch.dict(os.environ, envs): - names = set(bundle_manager.bundle_configs.keys()) + bundle_manager.parse_config() + names = set(x.name for x in bundle_manager.get_all_dag_bundles()) assert names == expected_names @pytest.mark.parametrize( "config,message", [ - pytest.param("1", "Bundle config for my-test-bundle is not a dict: 1", id="int"), - pytest.param("[]", r"Bundle config for my-test-bundle is not a dict: \[\]", id="list"), - pytest.param("abc", r"Unable to parse .* as valid json", id="not_json"), + pytest.param("1", "Bundle config is not a list", id="int"), + pytest.param("[]", None, id="list"), + pytest.param("{}", None, id="dict"), + pytest.param("abc", "Unable to parse .* as valid json", id="not_json"), ], ) def test_bundle_configs_property_raises(config, message): bundle_manager = DagBundlesManager() with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": config}): - with pytest.raises(AirflowConfigException, match=message): - bundle_manager.bundle_configs + if message: + with pytest.raises(AirflowConfigException, match=message): + bundle_manager.parse_config() + else: + bundle_manager.parse_config() class BasicBundle(BaseDagBundle): @@ -158,16 +172,16 @@ def _get_bundle_names_and_active(): # Initial add with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("my-test-bundle", True)] + assert _get_bundle_names_and_active() == [("my-test-bundle", True)] # Disable ones that disappear from config bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("my-test-bundle", False)] + assert _get_bundle_names_and_active() == [("dags-folder", True), ("my-test-bundle", False)] # Re-enable one that reappears in config with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("my-test-bundle", True)] + assert _get_bundle_names_and_active() == [("dags-folder", False), ("my-test-bundle", True)] # import yaml From ae79276a0ce20b77f7ea863344b842eb97421396 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 2 Jan 2025 12:48:03 -0800 Subject: [PATCH 03/10] fix version added? --- airflow/config_templates/config.yml | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index ba87c60a72df1..7fa16a73cc4dc 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2700,8 +2700,8 @@ dag_bundles: overridden here if desired. Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. - version_added: 3.0 - type: object + version_added: 3.0.0 + type: string example: > [ { @@ -2714,4 +2714,11 @@ dag_bundles: "refresh_interval": 0 } ]' - default: '[{{"name": "dags-folder", "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", "kwargs": {{}}}}]' + default: > + [ + {{ + "name": "dags-folder", + "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", + "kwargs": {{}} + }} + ] From 75d05176eb2cd0bb129f37307b0365cc4d141ac2 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 2 Jan 2025 13:35:18 -0800 Subject: [PATCH 04/10] fix config --- airflow/config_templates/config.yml | 35 ++++++++++++++++------------- airflow/configuration.py | 2 ++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 7fa16a73cc4dc..9aa89c43ae86b 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2699,26 +2699,29 @@ dag_bundles: By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be overridden here if desired. + Note: you can split your json config over multiple lines by indenting. See configparser documentation + for an example: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure. + Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. version_added: 3.0.0 type: string example: > [ - { - "name": "my-git-repo", - "classpath": "airflow.dag_processing.bundles.git.GitDagBundle", - "kwargs": { - "subdir": "dags", - "repo_url": "git@github.com:example.com/my-dags.git", - "tracking_ref": "main", - "refresh_interval": 0 - } - ]' + { + "name": "my-git-repo", + "classpath": "airflow.dag_processing.bundles.git.GitDagBundle", + "kwargs": { + "subdir": "dags", + "repo_url": "git@github.com:example.com/my-dags.git", + "tracking_ref": "main", + "refresh_interval": 0 + } + ] default: > [ - {{ - "name": "dags-folder", - "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", - "kwargs": {{}} - }} - ] + {{ + "name": "dags-folder", + "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", + "kwargs": {{}} + }} + ] diff --git a/airflow/configuration.py b/airflow/configuration.py index f5b2f8f7d5328..4fac359573ddd 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -515,6 +515,8 @@ def _write_option_header( if example is not None and include_examples: if extra_spacing: file.write("#\n") + example_lines = example.splitlines() + example = "\n# ".join(example_lines) file.write(f"# Example: {option} = {example}\n") needs_separation = True if include_sources and sources_dict: From 2556afa92056de58d3885e8302fbc530a6422ac9 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:21:31 -0800 Subject: [PATCH 05/10] comment out everything must handle multi line values --- airflow/configuration.py | 2 ++ tests/cli/commands/remote_commands/test_config_command.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 4fac359573ddd..1ea8948483175 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -555,6 +555,8 @@ def _write_value( file.write(f"# {option} = \n") else: if comment_out_everything: + value_lines = value.splitlines() + value = "\n# ".join(value_lines) file.write(f"# {option} = {value}\n") else: file.write(f"{option} = {value}\n") diff --git a/tests/cli/commands/remote_commands/test_config_command.py b/tests/cli/commands/remote_commands/test_config_command.py index f932b1851d227..b697f6d91cfc0 100644 --- a/tests/cli/commands/remote_commands/test_config_command.py +++ b/tests/cli/commands/remote_commands/test_config_command.py @@ -206,7 +206,8 @@ def test_cli_comment_out_everything(self): ) output = temp_stdout.getvalue() lines = output.splitlines() - assert all(not line.strip() or line.startswith(("#", "[")) for line in lines if line) + bad_lines = [l for l in lines if l and not (not l.strip() or l.startswith(("#", "[")))] # noqa: E741 + assert bad_lines == [] class TestCliConfigGetValue: From 64efa619d3821df3ec525389852c767f15eb146a Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 3 Jan 2025 09:39:12 -0800 Subject: [PATCH 06/10] fix config docs rendering --- airflow/config_templates/config.yml | 30 ++++++--------------- docs/exts/includes/sections-and-options.rst | 21 ++++++++++++--- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 9aa89c43ae86b..5890e43de6c87 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2676,33 +2676,19 @@ dag_bundles: description: | Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources. - Airflow will consume all options added to this section. Below you will see only the default, - ``dags_folder``. The option name is the bundle name and the value is a json object with the following - keys: - - * classpath: The classpath of the bundle class - * kwargs: The keyword arguments to pass to the bundle class - * refresh_interval: The interval in seconds to refresh the bundle from its source. - - For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your - airflow.cfg (this is just an example, the classpath and kwargs are not real): - - .. code-block:: ini - - [dag_bundles] - hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60} options: backends: description: | List of backend configs. Must supply name, classpath, and kwargs for each backend. - By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be - overridden here if desired. + By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can + also be overridden in kwargs if desired. - Note: you can split your json config over multiple lines by indenting. See configparser documentation - for an example: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure. + The default is the dags folder dag bundle. - Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. + Note: As shown below, you can split your json config over multiple lines by indenting. + See configparser documentation for an example: + https://docs.python.org/3/library/configparser.html#supported-ini-file-structure. version_added: 3.0.0 type: string example: > @@ -2716,7 +2702,7 @@ dag_bundles: "tracking_ref": "main", "refresh_interval": 0 } - ] + ] default: > [ {{ @@ -2724,4 +2710,4 @@ dag_bundles: "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", "kwargs": {{}} }} - ] + ] diff --git a/docs/exts/includes/sections-and-options.rst b/docs/exts/includes/sections-and-options.rst index f191cf10d5579..f9bbf5c83d5fe 100644 --- a/docs/exts/includes/sections-and-options.rst +++ b/docs/exts/includes/sections-and-options.rst @@ -60,7 +60,15 @@ {% endif %} :Type: {{ option["type"] }} - :Default: ``{{ "''" if option["default"] == "" else option["default"] }}`` + :Default: + {% set default = option["default"] %} + {% if default and "\n" in default %} + .. code-block:: + + {{ default }} + {% else %} + ``{{ "''" if default == "" else default }}`` + {% endif %} {% if option.get("sensitive") %} :Environment Variables: ``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}`` @@ -71,9 +79,16 @@ {% else %} :Environment Variable: ``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}`` {% endif %} - {% if option["example"] %} + {% set example = option["example"] %} + {% if example %} :Example: - ``{{ option["example"] }}`` + {% if "\n" in example %} + .. code-block:: + + {{ example }} + {% else %} + ``{{ example }}`` + {% endif %} {% endif %} {% endfor %} From 599e3cb1e15d36a9140aadc73c9dc7261d0eef76 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 6 Jan 2025 11:31:07 -0800 Subject: [PATCH 07/10] cache on class; assume that config will not change --- airflow/dag_processing/bundles/manager.py | 43 ++++++------ .../bundles/test_dag_bundle_manager.py | 66 ++++++++++--------- 2 files changed, 53 insertions(+), 56 deletions(-) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 9e97c7c8449f7..21641649e6d05 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -31,50 +31,50 @@ from airflow.dag_processing.bundles.base import BaseDagBundle -_bundle_config = {} - class DagBundlesManager(LoggingMixin): """Manager for DAG bundles.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._bundle_config = {} + self.parse_config() + def parse_config(self) -> None: """ Get all DAG bundle configurations and store in module variable. If a bundle class for a given name has already been imported, it will not be imported again. + + todo (AIP-66): proper validation of the bundle configuration so we have better error messages """ - configured_bundles = conf.getjson("dag_bundles", "backends") + if self._bundle_config: + return - if not configured_bundles: + backends = conf.getjson("dag_bundles", "backends") + + if not backends: return - if not isinstance(configured_bundles, list): + if not isinstance(backends, list): raise AirflowConfigException( "Bundle config is not a list. Check config value" " for section `dag_bundles` and key `backends`." ) seen = set() - for cfg in configured_bundles: + for cfg in backends: name = cfg["name"] if name in seen: raise ValueError(f"Dag bundle {name} is configured twice.") seen.add(name) - if name in _bundle_config: - continue class_ = import_string(cfg["classpath"]) kwargs = cfg["kwargs"] - _bundle_config[name] = (class_, kwargs) - - # remove obsolete bundle configs - for name in list(_bundle_config.keys()): - if name not in seen: - _bundle_config.pop(name) + self._bundle_config[name] = (class_, kwargs) @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: - self.parse_config() stored = {b.name: b for b in session.query(DagBundleModel).all()} - for name in _bundle_config.keys(): + for name in self._bundle_config.keys(): if bundle := stored.pop(name, None): bundle.active = True else: @@ -94,13 +94,9 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: :return: The DAG bundle. """ - # todo (AIP-66): proper validation of the bundle configuration so we have better error messages - cfg_tuple = _bundle_config.get(name) + cfg_tuple = self._bundle_config.get(name) if not cfg_tuple: - self.parse_config() - cfg_tuple = _bundle_config.get(name) - if not cfg_tuple: - raise ValueError(f"Requested bundle '{name}' is not configured.") + raise ValueError(f"Requested bundle '{name}' is not configured.") class_, kwargs = cfg_tuple return class_(name=name, version=version, **kwargs) @@ -110,6 +106,5 @@ def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]: :return: list of DAG bundles. """ - self.parse_config() - for name, (class_, kwargs) in _bundle_config.items(): + for name, (class_, kwargs) in self._bundle_config.items(): yield class_(name=name, version=None, **kwargs) diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index d0ae79b6df4df..674a704b1e1b2 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -38,29 +38,37 @@ pytest.param({}, {"dags-folder"}, id="default"), pytest.param( {"AIRFLOW__DAG_BUNDLES__BACKENDS": "[]"}, - {"dags-folder"}, + set(), id="empty-list", ), pytest.param( - {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, - {"dags-folder"}, - id="empty-string", + { + "AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps( + [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"local_folder": "/tmp/hihi", "refresh_interval": 1}, + } + ] + ) + }, + {"my-bundle"}, + id="remove_dags_folder_default_add_bundle", ), pytest.param( { - "AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", - "AIRFLOW__DAG_BUNDLES__BACKENDS": "{}", + "AIRFLOW__DAG_BUNDLES__BACKENDS": "[]", }, - {"dags-folder"}, - id="remove_dags_folder_default_add_bundle", + set(), + id="remove_dags_folder_default", ), ], ) -def test_bundle_configs_property(envs, expected_names): +def test_parse_bundle_config(envs, expected_names): """Test that bundle_configs are read from configuration.""" - bundle_manager = DagBundlesManager() with patch.dict(os.environ, envs): - bundle_manager.parse_config() + bundle_manager = DagBundlesManager() names = set(x.name for x in bundle_manager.get_all_dag_bundles()) assert names == expected_names @@ -75,13 +83,12 @@ def test_bundle_configs_property(envs, expected_names): ], ) def test_bundle_configs_property_raises(config, message): - bundle_manager = DagBundlesManager() with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": config}): if message: with pytest.raises(AirflowConfigException, match=message): - bundle_manager.parse_config() + DagBundlesManager() else: - bundle_manager.parse_config() + DagBundlesManager() class BasicBundle(BaseDagBundle): @@ -107,11 +114,11 @@ def path(self): def test_get_bundle(): """Test that get_bundle builds and returns a bundle.""" - bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle_manager = DagBundlesManager() + with pytest.raises(ValueError, match="'bundle-that-doesn't-exist' is not configured"): - bundle = bundle_manager.get_bundle(name="bundle-that-doesn't-exist", version="hello") + bundle_manager.get_bundle(name="bundle-that-doesn't-exist", version="hello") bundle = bundle_manager.get_bundle(name="my-test-bundle", version="hello") assert isinstance(bundle, BasicBundle) assert bundle.name == "my-test-bundle" @@ -129,10 +136,9 @@ def test_get_bundle(): def test_get_all_dag_bundles(): """Test that get_all_dag_bundles returns all bundles.""" - bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundles = list(bundle_manager.get_all_dag_bundles()) + manager = DagBundlesManager() + bundles = list(manager.get_all_dag_bundles()) assert len(bundles) == 1 assert all(isinstance(x, BaseDagBundle) for x in bundles) @@ -161,8 +167,6 @@ def clear_db(): @pytest.mark.db_test def test_sync_bundles_to_db(clear_db): - bundle_manager = DagBundlesManager() - def _get_bundle_names_and_active(): with create_session() as session: return ( @@ -171,20 +175,18 @@ def _get_bundle_names_and_active(): # Initial add with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle_manager.sync_bundles_to_db() + manager = DagBundlesManager() + manager.sync_bundles_to_db() assert _get_bundle_names_and_active() == [("my-test-bundle", True)] - # Disable ones that disappear from config - bundle_manager.sync_bundles_to_db() + # simulate bundle config change + # note: airflow will detect config changes when they are in env vars + manager = DagBundlesManager() + manager.sync_bundles_to_db() assert _get_bundle_names_and_active() == [("dags-folder", True), ("my-test-bundle", False)] # Re-enable one that reappears in config with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle_manager.sync_bundles_to_db() + manager = DagBundlesManager() + manager.sync_bundles_to_db() assert _get_bundle_names_and_active() == [("dags-folder", False), ("my-test-bundle", True)] - - -# import yaml -# from pathlib import Path -# d = yaml.safe_load(Path("/Users/dstandish/code/airflow/airflow/config_templates/config.yml").open()) -# d["dag_bundles"] From 8c4e8646e1391b4bb6025f9527f1705e9ec6714f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 6 Jan 2025 11:34:30 -0800 Subject: [PATCH 08/10] meta private --- airflow/dag_processing/bundles/manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 21641649e6d05..8bc8b87506cbf 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -42,11 +42,13 @@ def __init__(self, *args, **kwargs): def parse_config(self) -> None: """ - Get all DAG bundle configurations and store in module variable. + Get all DAG bundle configurations and store in instance variable. If a bundle class for a given name has already been imported, it will not be imported again. todo (AIP-66): proper validation of the bundle configuration so we have better error messages + + :meta private: """ if self._bundle_config: return From 6b85d63c3dcdee938941a16a8dcb3abe6424a92c Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 6 Jan 2025 12:29:47 -0800 Subject: [PATCH 09/10] combine tests --- .../bundles/test_dag_bundle_manager.py | 92 ++++++------------- 1 file changed, 29 insertions(+), 63 deletions(-) diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index 674a704b1e1b2..d9bc286181f28 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -19,6 +19,7 @@ import json import os +from contextlib import nullcontext from unittest.mock import patch import pytest @@ -33,62 +34,52 @@ @pytest.mark.parametrize( - "envs,expected_names", + "value, expected", [ - pytest.param({}, {"dags-folder"}, id="default"), + pytest.param(None, {"dags-folder"}, id="default"), + pytest.param("{}", set(), id="empty dict"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__BACKENDS": "[]"}, + "[]", set(), - id="empty-list", + id="empty list", ), pytest.param( - { - "AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps( - [ - { - "name": "my-bundle", - "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", - "kwargs": {"local_folder": "/tmp/hihi", "refresh_interval": 1}, - } - ] - ) - }, + json.dumps( + [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"local_folder": "/tmp/hihi", "refresh_interval": 1}, + } + ] + ), {"my-bundle"}, id="remove_dags_folder_default_add_bundle", ), pytest.param( - { - "AIRFLOW__DAG_BUNDLES__BACKENDS": "[]", - }, + "[]", set(), id="remove_dags_folder_default", ), + pytest.param("1", "Bundle config is not a list", id="int"), + pytest.param("abc", "Unable to parse .* as valid json", id="not_json"), ], ) -def test_parse_bundle_config(envs, expected_names): +def test_parse_bundle_config(value, expected): """Test that bundle_configs are read from configuration.""" - with patch.dict(os.environ, envs): + envs = {"AIRFLOW__DAG_BUNDLES__BACKENDS": value} if value else {} + cm = nullcontext() + exp_fail = False + if isinstance(expected, str): + exp_fail = True + cm = pytest.raises(AirflowConfigException, match=expected) + + with patch.dict(os.environ, envs), cm: bundle_manager = DagBundlesManager() names = set(x.name for x in bundle_manager.get_all_dag_bundles()) - assert names == expected_names - -@pytest.mark.parametrize( - "config,message", - [ - pytest.param("1", "Bundle config is not a list", id="int"), - pytest.param("[]", None, id="list"), - pytest.param("{}", None, id="dict"), - pytest.param("abc", "Unable to parse .* as valid json", id="not_json"), - ], -) -def test_bundle_configs_property_raises(config, message): - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": config}): - if message: - with pytest.raises(AirflowConfigException, match=message): - DagBundlesManager() - else: - DagBundlesManager() + if not exp_fail: + assert names == expected class BasicBundle(BaseDagBundle): @@ -133,31 +124,6 @@ def test_get_bundle(): assert bundle.version is None -def test_get_all_dag_bundles(): - """Test that get_all_dag_bundles returns all bundles.""" - - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): - manager = DagBundlesManager() - bundles = list(manager.get_all_dag_bundles()) - assert len(bundles) == 1 - assert all(isinstance(x, BaseDagBundle) for x in bundles) - - bundle_names = {x.name for x in bundles} - assert bundle_names == {"my-test-bundle"} - - -def test_get_all_dag_bundles_default(): - """Test that get_all_dag_bundles returns all bundles.""" - - bundle_manager = DagBundlesManager() - bundles = list(bundle_manager.get_all_dag_bundles()) - assert len(bundles) == 1 - assert all(isinstance(x, BaseDagBundle) for x in bundles) - - bundle_names = {x.name for x in bundles} - assert bundle_names == {"dags-folder"} - - @pytest.fixture def clear_db(): clear_db_dag_bundles() From adc2f48e79b5bd1208cb1c5b4dbd06d7864d484a Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 6 Jan 2025 14:40:25 -0800 Subject: [PATCH 10/10] move to type checking block --- airflow/dag_processing/bundles/manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 8bc8b87506cbf..2eaba73148571 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -from collections.abc import Iterable from typing import TYPE_CHECKING from airflow.configuration import conf @@ -27,6 +26,8 @@ from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: + from collections.abc import Iterable + from sqlalchemy.orm import Session from airflow.dag_processing.bundles.base import BaseDagBundle