Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup DWH Synchronization #1071

Merged
merged 6 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions analytics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*.duckdb
.duckdb
*.db
.ipynb_checkpoints/
*.csv
.tmp
.log
1 change: 1 addition & 0 deletions analytics/dagster/.telemetry/id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
instance_id: 3535e14f-e2a7-4b35-a902-c2e428480e12
36 changes: 24 additions & 12 deletions analytics/dagster/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,32 +1,44 @@
FROM python:3.10-slim

RUN pip install dagster-webserver dagster-postgres dagster-aws
ARG ZLV_HTTP_USERNAME=zlv
ARG ZLV_HTTP_PASSWORD=zlv

RUN apt-get update && apt-get install -y nginx apache2-utils
ENV ZLV_HTTP_USERNAME=$ZLV_HTTP_USERNAME
ENV ZLV_HTTP_PASSWORD=$ZLV_HTTP_PASSWORD

RUN ls
# Install dependencies
COPY requirements.txt .
COPY --from=clevercloud/clever-tools /bin/clever /usr/local/bin/clever

# Installer les dépendances nécessaires
RUN apt-get update && \
apt-get install -y nginx apache2-utils curl gnupg && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Installer Clever Tools
RUN curl -sL https://clever-tools.clever-cloud.com/releases/latest/clever-tools-latest_linux.tar.gz | tar xz -C /usr/local/bin

# Installer les packages Python requis
RUN pip install dagster-webserver dagster-postgres dagster-aws

# Copier les fichiers de configuration et le code source
COPY requirements.txt .
RUN pip install -r requirements.txt

# Définir la variable d'environnement pour Dagster
ENV DAGSTER_HOME=/opt/dagster/dagster_home/

RUN mkdir -p $DAGSTER_HOME

COPY dagster.yaml workspace.yaml $DAGSTER_HOME

COPY src/ $DAGSTER_HOME

# Définir le répertoire de travail
WORKDIR $DAGSTER_HOME

# Setup Nginx configuration
RUN htpasswd -cb /etc/nginx/.htpasswd zlv zlv
# Configurer l'authentification basique pour Nginx
RUN htpasswd -cb /etc/nginx/.htpasswd $ZLV_HTTP_USERNAME $ZLV_HTTP_PASSWORD
COPY docker/nginx/nginx.conf /etc/nginx/sites-available/default

# Expose port 80 for the Nginx web server
# Exposer le port 8080 pour le serveur web Nginx
EXPOSE 8080

# Définir la commande de démarrage
CMD nginx -g 'daemon off;' & dagster-webserver -h 0.0.0.0 -p 3000

32 changes: 17 additions & 15 deletions analytics/dagster/dagster.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# storage:
# postgres:
# postgres_db:
# username:
# env: DAGSTER_PG_USERNAME
# password:
# env: DAGSTER_PG_PASSWORD
# hostname:
# env: DAGSTER_PG_HOST
# db_name:
# env: DAGSTER_PG_DB
# port:
# env: DAGSTER_PG_PORT
storage:
postgres:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432


telemetry:
enabled: false
enabled: true

python_logs:
python_log_level: INFO
managed_python_loggers:
- my_logger
- my_logger

3 changes: 3 additions & 0 deletions analytics/dagster/logs/event.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"action": "step_success_event", "client_time": "2025-01-09 11:22:22.838835", "event_id": "7c11e930-235c-47a5-b4f6-1d724c01d08e", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "331c48224eff1759788e40174b635f328555bb04b1a4bbd1e7eac1dd2cf39354", "step_key_hash": "5310d1bc5da6c8b581cace1e131e053834acfbe8a4647d922e7cf17ce2e6b60d", "duration_ms": 250506.41840499884}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_start_event", "client_time": "2025-01-09 11:23:34.230866", "event_id": "f1fee084-6840-47f8-9f95-384e99cfe72e", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "075dffc13840fd3bd82e6f9da3e227d2efba9d25b67d44fdf1c81067d0fca22b", "step_key_hash": "f6cd558066b27dfbfa37ee3b6b7f9b33e26320d98fd68382dc210944693ff806"}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_success_event", "client_time": "2025-01-09 11:25:39.837169", "event_id": "ed9b910b-9508-4923-b945-8391067cad80", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "075dffc13840fd3bd82e6f9da3e227d2efba9d25b67d44fdf1c81067d0fca22b", "step_key_hash": "f6cd558066b27dfbfa37ee3b6b7f9b33e26320d98fd68382dc210944693ff806", "duration_ms": 125560.43714100088}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
12 changes: 11 additions & 1 deletion analytics/dagster/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
dagster<w
dagster
dagster-dbt
dagster-duckdb
dagster-embedded-elt
GeoAlchemy2==0.16.0
dbt-duckdb
duckdb
matplotlib
pandas
requests
dlt[duckdb]
dlt[filesystem]
dlt[parquet]
s3fs

thefuzz==0.22.1





2 changes: 1 addition & 1 deletion analytics/dagster/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .definitions import defs as defs
from .definitions import defs as defs
12 changes: 10 additions & 2 deletions analytics/dagster/src/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from .production_dlt import dagster_production_assets
# from .production_dlt import dagster_production_assets
from .production_dbt import dbt_production_assets
from .clever import clevercloud_login_and_restart
from .dwh import __all__

__all__ = ["dagster_production_assets"]
# __all__ = ["dagster_production_assets"]
__all__ = [
"dbt_production_assets",
"clevercloud_login_and_restart"
]

3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/clever/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .restart import clevercloud_login_and_restart

__all__ = ["clevercloud_login_and_restart"]
16 changes: 16 additions & 0 deletions analytics/dagster/src/assets/clever/restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import subprocess
from dagster import asset
from ...config import Config

@asset(group_name="deployment", deps=["upload_duckdb_to_s3"], name="clevercloud_login_and_restart")
def clevercloud_login_and_restart():

# Log in to Clever Cloud using environment variables
login_command = ["clever", "login", "--token", Config.CLEVER_TOKEN, "--secret", Config.CLEVER_SECRET]
subprocess.run(login_command, check=True)

# Restart the application
restart_command = ["clever", "restart", "--app", Config.METABASE_APP_ID]
subprocess.run(restart_command, check=True)

return f"Application {Config.METABASE_APP_ID} has been restarted."
16 changes: 16 additions & 0 deletions analytics/dagster/src/assets/dwh/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .checks import check_ff_lovac_on_duckdb
from .copy import copy_dagster_duckdb_to_metabase_duckdb
from .ingest import import_postgres_data_from_replica_to_duckdb, import_cerema_ff_lovac_data_from_s3_to_duckdb, setup_replica_db, setup_s3_connection
from .upload import upload_duckdb_to_s3, upload_ff_to_s3, download_ff_from_s3

__all__ = [
"check_ff_lovac_on_duckdb",
"copy_dagster_duckdb_to_metabase_duckdb",
"import_postgres_data_from_replica_to_duckdb",
"import_cerema_ff_lovac_data_from_s3_to_duckdb",
"setup_replica_db",
"setup_s3_connection",
"upload_duckdb_to_s3",
"upload_ff_to_s3",
"download_ff_from_s3",
]
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/dwh/checks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .ff_table_exists import check_ff_lovac_on_duckdb

__all__ = ["check_ff_lovac_on_duckdb"]
32 changes: 32 additions & 0 deletions analytics/dagster/src/assets/dwh/checks/ff_table_exists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dagster import AssetSpec, multi_asset
from ..ingest.queries.lovac import lovac_tables_sql
from ..ingest.queries.ff import ff_tables_sql
from dagster_duckdb import DuckDBResource


# Asset for uploading the DuckDB metabase file to S3
all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}

print([ f"check_{table_name}" for table_name in all_tables_sql.keys() ])


@multi_asset(
specs=[
AssetSpec(
f"check_{table_name}",
kinds={"duckdb"},
deps=[f"build_{table_name}"],
group_name="check",
)
for table_name in all_tables_sql.keys()
],
can_subset=True,
)
def check_ff_lovac_on_duckdb(duckdb: DuckDBResource):
query = "SELECT * FROM ff.lovac LIMIT 1;"
with duckdb.get_connection() as conn:
res = conn.execute(query)
if res.fetchdf().empty:
raise Exception("No data in ff.lovac table")
else:
return "Data found in ff.lovac table"
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/dwh/copy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .copy_to_clean_duckdb import copy_dagster_duckdb_to_metabase_duckdb

__all__ = ["copy_dagster_duckdb_to_metabase_duckdb"]
62 changes: 62 additions & 0 deletions analytics/dagster/src/assets/dwh/copy/copy_to_clean_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dagster import AssetKey
from ....config import RESULT_TABLES, translate_table_name
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset


source_schema = "main_marts"
destination_schema = "main"


def process_specific_table(
table_name: str, duckdb: DuckDBResource, duckdb_metabase: DuckDBResource
):
chemin_destination_db = (
duckdb_metabase.database
) # Assurez-vous que ce chemin est correct

source_table_name = table_name
destination_table_name = translate_table_name(table_name)
with duckdb.get_connection() as conn:
conn.execute(f"ATTACH '{chemin_destination_db}' AS destination_db;")
conn.execute(
f"""
CREATE OR REPLACE TABLE destination_db.{destination_schema}.{destination_table_name} AS
SELECT * FROM {source_schema}.{source_table_name};
"""
)

# Detach the source database
# conn.execute("DETACH DATABASE source_db;")
conn.execute("DETACH DATABASE destination_db;")


@multi_asset(
specs=[
AssetSpec(
f"copy_{table_name}",
kinds={"duckdb"},
deps=[AssetKey(["marts", table_name])],
group_name="copy",
)
for table_name in RESULT_TABLES
],
can_subset=True,
)
def copy_dagster_duckdb_to_metabase_duckdb(
context: AssetExecutionContext,
duckdb: DuckDBResource,
duckdb_metabase: DuckDBResource,
):

for table_name in RESULT_TABLES:
if (
AssetKey(f"copy_{table_name}")
in context.op_execution_context.selected_asset_keys
):
process_specific_table(
table_name=table_name, duckdb=duckdb, duckdb_metabase=duckdb_metabase
)
yield MaterializeResult(asset_key=f"copy_{table_name}")
else:
pass
9 changes: 9 additions & 0 deletions analytics/dagster/src/assets/dwh/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .ingest_lovac_ff_s3_asset import import_cerema_ff_lovac_data_from_s3_to_duckdb, setup_s3_connection
from .ingest_postgres_asset import import_postgres_data_from_replica_to_duckdb, setup_replica_db

__all__ = [
"import_postgres_data_from_replica_to_duckdb",
"import_cerema_ff_lovac_data_from_s3_to_duckdb",
"setup_replica_db",
"setup_s3_connection",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from dagster import AssetKey, asset
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset
from ....config import Config
from .queries.lovac import lovac_tables_sql
from .queries.ff import ff_tables_sql


all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}


@asset(name="setup_s3_connection", description="Load all tables into DuckDB")
def setup_s3_connection(context, duckdb: DuckDBResource):
query = f"""
CREATE OR REPLACE PERSISTENT SECRET SECRET (
TYPE S3,
KEY_ID '{Config.CELLAR_ACCESS_KEY_ID}',
SECRET '{Config.CELLAR_SECRET_ACCESS_KEY}',
ENDPOINT '{Config.CELLAR_HOST_URL}',
REGION '{Config.CELLAR_REGION}'
);
"""
with duckdb.get_connection() as conn:
context.log.info(f"Executing SQL: {query}")
conn.execute(query)
schema_query = """
CREATE SCHEMA IF NOT EXISTS ff;
CREATE SCHEMA IF NOT EXISTS lovac;
"""
context.log.info(f"Executing SQL: {schema_query}")
conn.execute(schema_query)


def process_subset(name: str, context: AssetExecutionContext, duckdb: DuckDBResource):
with duckdb.get_connection() as conn:

command = all_tables_sql[name]
context.log.info(f"Executing SQL: {command}")
res = conn.execute(command)
context.log.info(f"Result: {res.fetchdf()}")


@multi_asset(
specs=[
AssetSpec(
f"build_{name}",
deps=["setup_s3_connection"],
kinds={"duckdb", "s3"},
group_name="lovac" if "lovac" in name else "ff",
)
for name in all_tables_sql
],
can_subset=True,
)
def import_cerema_ff_lovac_data_from_s3_to_duckdb(
context: AssetExecutionContext, duckdb: DuckDBResource
):
context.log.info("Importing data from replica to DuckDB")
context.log.info("duckdb: " + duckdb.__str__())
for name in all_tables_sql:
if AssetKey(f"build_{name}") in context.op_execution_context.selected_asset_keys:
context.log.info(f"Found {name} in selected_asset_keys")
process_subset(name, context, duckdb)
yield MaterializeResult(asset_key=f"build_{name}")
else:
pass
Loading