Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REFACTOR][CONVENTION COLLECTIVE] Use DataProcessor #453

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 0 additions & 68 deletions workflows/data_pipelines/convcollective/DAG.py

This file was deleted.

12 changes: 12 additions & 0 deletions workflows/data_pipelines/convcollective/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Documentation

## data_processing_convention_collective

| Information | Valeur |
| -------- | -------- |
| Fichier source | `dag.py` |
| Description | Ce traitement permet de synthétiser les données sur les convention collectives des établissements. |
| Fréquence | Quotidienne |
| Données sources | [ Liste des conventions collectives par entreprise (SIRET)](https://www.data.gouv.fr/fr/datasets/5e7201d522f2a43e9f736a9a/) |
| Données de sorties | Minio |
| Channel Tchap d'information | ~annuaire-entreprises-tech |
22 changes: 22 additions & 0 deletions workflows/data_pipelines/convcollective/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dag_datalake_sirene.config import (
MINIO_BASE_URL,
DATA_GOUV_BASE_URL,
DataSourceConfig,
)

CONVENTION_COLLECTIVE_CONFIG = DataSourceConfig(
name="convention_collective",
tmp_folder=f"{DataSourceConfig.base_tmp_folder}/convention_collective",
minio_path="convention_collective",
file_name="convention_collective",
files_to_download={
"convention_collective": {
"url": f"{DATA_GOUV_BASE_URL}a22e54f7-b937-4483-9a72-aad2ea1316f1",
"resource_id": "a22e54f7-b937-4483-9a72-aad2ea1316f1",
"destination": f"{DataSourceConfig.base_tmp_folder}/convention_collective/convention_collective-download.csv",
}
},
url_minio=f"{MINIO_BASE_URL}convention_collective/latest/convention_collective.csv",
url_minio_metadata=f"{MINIO_BASE_URL}convention_collective/latest/metadata.json",
file_output=f"{DataSourceConfig.base_tmp_folder}/convention_collective/convention_collective.csv",
)
70 changes: 70 additions & 0 deletions workflows/data_pipelines/convcollective/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from datetime import timedelta

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

from dag_datalake_sirene.helpers import Notification
from dag_datalake_sirene.config import EMAIL_LIST
from dag_datalake_sirene.workflows.data_pipelines.convcollective.processor import (
ConventionCollectiveProcessor,
)


default_args = {
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": EMAIL_LIST,
"retries": 1,
}


@dag(
tags=["convention collective"],
default_args=default_args,
schedule_interval="0 16 * * *",
start_date=days_ago(8),
dagrun_timeout=timedelta(minutes=60),
params={},
catchup=False,
on_failure_callback=Notification.send_notification_tchap,
on_success_callback=Notification.send_notification_tchap,
)
def data_processing_convention_collective():
convention_collective_processor = ConventionCollectiveProcessor()

@task.bash
def clean_previous_outputs():
return f"rm -rf {convention_collective_processor.config.tmp_folder} && mkdir -p {convention_collective_processor.config.tmp_folder}"

@task
def download_data():
return convention_collective_processor.download_data()

@task
def preprocess_data():
return convention_collective_processor.preprocess_data()

@task
def save_date_last_modified():
return convention_collective_processor.save_date_last_modified()

@task
def send_file_to_minio():
return convention_collective_processor.send_file_to_minio()

@task
def compare_files_minio():
return convention_collective_processor.compare_files_minio()

(
clean_previous_outputs()
>> download_data()
>> preprocess_data()
>> save_date_last_modified()
>> send_file_to_minio()
>> compare_files_minio()
)


data_processing_convention_collective()
65 changes: 65 additions & 0 deletions workflows/data_pipelines/convcollective/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pandas as pd

from dag_datalake_sirene.helpers import DataProcessor, Notification
from dag_datalake_sirene.workflows.data_pipelines.convcollective.config import (
CONVENTION_COLLECTIVE_CONFIG,
)


class ConventionCollectiveProcessor(DataProcessor):
def __init__(self) -> None:
super().__init__(CONVENTION_COLLECTIVE_CONFIG)

def preprocess_data(self) -> None:
df_conv_coll = (
pd.read_csv(
self.config.files_to_download["convention_collective"]["destination"],
dtype="string",
names=["mois", "siret", "idcc", "date_maj"],
header=0,
)
.assign(
siren=lambda x: x["siret"].str[0:9],
idcc=lambda x: x["idcc"].str.replace(" ", ""),
)
.dropna(subset=["siret"])
)

# List of idcc per siren
df_list_cc = (
df_conv_coll.groupby(by=["siren"])["idcc"]
.unique()
.apply(list)
.reset_index(name="liste_idcc_unite_legale")
)

# list of idcc per siret
df_list_cc_per_siret = (
df_conv_coll.groupby(by=["siret", "siren"])["idcc"]
.apply(list)
.reset_index(name="liste_idcc_etablissement")
)

# mapping of idcc to the list of relevant siret, per siren
siret_idcc_dict = {}
for siren, group in df_conv_coll.groupby("siren"):
idcc_siret_dict: dict[str, list[str]] = {}
for _, row in group.iterrows():
idcc = row["idcc"]
siret = row["siret"]
if idcc not in idcc_siret_dict:
idcc_siret_dict[idcc] = []
idcc_siret_dict[idcc].append(siret)
siret_idcc_dict[siren] = idcc_siret_dict

df_list_cc_per_siren = pd.DataFrame(
siret_idcc_dict.items(), columns=["siren", "sirets_par_idcc"]
)

df_cc = df_list_cc_per_siret.merge(
df_list_cc_per_siren, on="siren", how="left"
).merge(df_list_cc, on="siren", how="left")

df_cc.to_csv(self.config.file_output, index=False)

DataProcessor.push_unique_count(df_cc.siren, Notification.notification_xcom_key)
144 changes: 0 additions & 144 deletions workflows/data_pipelines/convcollective/task_functions.py

This file was deleted.

Loading