From 27ffc76be384a3f842e5b3d6abe2abbb85572fe3 Mon Sep 17 00:00:00 2001 From: Pierlou Ramade <48205215+Pierlou@users.noreply.github.com> Date: Wed, 4 Sep 2024 12:10:01 +0200 Subject: [PATCH] Upgrade airflow and dependencies (#44) * upgrade airflow and dependencies * update dockerfile * refactor config * update dockerfile * restore gitignore --- .gitignore | 2 +- Dockerfile | 10 +++++++--- airflow.cfg | 42 ++++++++++++++++++++---------------------- requirements.txt | 26 +++++++++++++++----------- 4 files changed, 43 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore index 7c034e8..43daaba 100755 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ pg-airflow/ ssh/ .env variables.py -.DS_Store \ No newline at end of file +.DS_Store diff --git a/Dockerfile b/Dockerfile index 6f8b1d3..7e466c5 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,5 @@ -FROM apache/airflow:2.7.3-python3.10 + +FROM apache/airflow:2.10.0-python3.10 USER root @@ -23,6 +24,9 @@ RUN apt-get install lftp -y RUN apt-get install zip -y RUN apt-get install wget -y RUN apt-get install p7zip-full -y +RUN apt-get install nano -y +RUN apt-get install jq -y +RUN apt-get install libmagic1 -y RUN chown -R "airflow:root" /opt/airflow/ @@ -41,6 +45,6 @@ ADD requirements.txt /requirements.txt RUN pip install -r /requirements.txt -RUN git config --global user.email "geoffrey.aldebert@data.gouv.fr" -RUN git config --global user.name "Geoffrey Aldebert (Bot Airflow)" +RUN git config --global user.email "your email" +RUN git config --global user.name "your username" diff --git a/airflow.cfg b/airflow.cfg index 43d3a21..ae8d1af 100755 --- a/airflow.cfg +++ b/airflow.cfg @@ -23,21 +23,14 @@ default_timezone = utc # full import path to the class when using a custom executor. executor = SequentialExecutor +[database] + # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engines. # More information here: # http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri sql_alchemy_conn = sqlite:////opt/airflow/airflow.db -# The encoding for the databases -sql_engine_encoding = utf-8 - -# Collation for ``dag_id``, ``task_id``, ``key`` columns in case they have different encoding. -# This is particularly useful in case of mysql with utf8mb4 encoding because -# primary keys for XCom table has too big size and ``sql_engine_collation_for_ids`` should -# be set to ``utf8mb3_general_ci``. -# sql_engine_collation_for_ids = - # If SqlAlchemy should pool database connections. sql_alchemy_pool_enabled = True @@ -68,10 +61,18 @@ sql_alchemy_pool_recycle = 1800 # https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic sql_alchemy_pool_pre_ping = True +# The encoding for the databases +sql_engine_encoding = utf-8 + # The schema to use for the metadata database. # SqlAlchemy supports databases with the concept of multiple schemas. sql_alchemy_schema = +# Number of times the code should be retried in case of DB Operational Errors. +# Not all transactions will be retried as it can cause undesired state. +# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. +max_db_retries = 3 + # Import path for connect args in SqlAlchemy. Defaults to an empty dict. # This is useful when you want to configure db engine args that SqlAlchemy won't parse # in connection string. @@ -87,7 +88,7 @@ parallelism = 32 # the number of tasks that is running concurrently for a DAG, add up the number of running # tasks for all DAG runs of the DAG. This is configurable at the DAG level with ``concurrency``, # which is defaulted as ``dag_concurrency``. -dag_concurrency = 16 +max_active_tasks_per_dag = 16 # Are DAGs paused by default at creation dags_are_paused_at_creation = True @@ -126,7 +127,7 @@ fernet_key = donot_pickle = True # How long before timing out a python file import -dagbag_import_timeout = 30.0 +dagbag_import_timeout = 200.0 # Should a traceback be shown in the UI for dagbag import errors, # instead of just the exception message @@ -210,11 +211,6 @@ lazy_load_plugins = True # loaded from module. lazy_discover_providers = True -# Number of times the code should be retried in case of DB Operational Errors. -# Not all transactions will be retried as it can cause undesired state. -# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``. -max_db_retries = 3 - # Hide sensitive Variables or Connection extra json keys from UI and task logs when set to True # # (Connection passwords are always hidden in logs) @@ -382,7 +378,7 @@ enable_experimental_api = False # Deny all : # auth_backend = airflow.api.auth.backend.deny_all # Basic Auth : -auth_backend = airflow.api.auth.backend.basic_auth +auth_backends = airflow.api.auth.backend.basic_auth # Used to set the maximum page limit for API requests @@ -677,6 +673,11 @@ sentry_dsn = # otherwise via ``CeleryExecutor`` kubernetes_queue = kubernetes +[kubernetes_executor] + +# The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` +namespace = default + [celery] # This section only applies if you are using the CeleryExecutor in @@ -834,7 +835,7 @@ scheduler_heartbeat_sec = 5 num_runs = -1 # The number of seconds to wait between consecutive DAG file processing -processor_poll_interval = 1 +scheduler_idle_sleep_time = 1 # Number of seconds after which a DAG file is parsed. The DAG file is parsed every # ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after @@ -942,7 +943,7 @@ api_rev = v3 host = # Format of the log_id, which is used to query for a given tasks logs -log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} +log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number} # Used to mark the end of a log stream for a task end_of_log_mark = end_of_log @@ -981,9 +982,6 @@ worker_container_repository = # The tag of the Kubernetes Image for the Worker to Run worker_container_tag = -# The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` -namespace = default - # If True, all worker pods will be deleted upon termination delete_worker_pods = True diff --git a/requirements.txt b/requirements.txt index 2b91101..8855718 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,29 +1,33 @@ GitPython -table-schema-to-markdown==0.4.6 +table-schema-to-markdown==0.4.12 frictionless==4.25.1 jsonschema -Unidecode==1.3.2 +Unidecode==1.3.6 PyYAML geojson shapely -minio==7.1.3 -boto3==1.28.0 +minio==7.2.8 +boto3==1.35.0 emails==0.6 -pandas==1.5.3 -papermill==2.3.4 -plotly==5.6.0 +pandas==2.2.2 +papermill==2.6.0 +plotly==5.24.0 plotly_express==0.4.1 kaleido==0.2.1 ipykernel==5.5.6 nbconvert==6.5.1 -openpyxl==3.0.9 +openpyxl==3.1.5 elasticsearch==7.17.0 elasticsearch_dsl==7.4.0 -requests==2.32.0 python-dotenv==0.21.0 -swifter==1.1.3 tweepy==4.8.0 pytest==7.2.1 langdetect==1.0.9 pydantic==2.4.0 -pyproj==3.6.1 \ No newline at end of file +pyproj==3.6.1 +requests==2.32.3 +swifter==1.4.0 +rdflib==6.3.2 +feedgen==1.0.0 +duckdb==0.10.2 +python-magic==0.4.27